Properly remove clusters after each test, and clean up duplicated codes in 
tests and move them into base test classes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0d5792b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0d5792b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0d5792b

Branch: refs/heads/master
Commit: c0d5792b745c67b6fee56ba79df02be89d1f049e
Parents: fe97055
Author: Lei Xia <[email protected]>
Authored: Thu Jun 7 17:15:54 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Fri Jul 13 11:20:49 2018 -0700

----------------------------------------------------------------------
 .../BestPossibleExternalViewVerifier.java       |  13 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   | 442 +-----------
 .../org/apache/helix/common/ZkTestBase.java     | 691 +++++++++++++++++++
 .../stages/TestMessageThrottleStage.java        |   4 +-
 .../stages/TestRebalancePipeline.java           |   8 +-
 .../SinglePartitionLeaderStandByTest.java       |   5 +-
 .../helix/integration/TestAddClusterV2.java     |  23 +-
 .../TestAddNodeAfterControllerStart.java        |   6 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   5 +-
 .../TestAlertingRebalancerFailure.java          |  39 +-
 .../helix/integration/TestBasicSpectator.java   |   4 +-
 .../integration/TestBatchMessageHandling.java   |   6 +-
 .../integration/TestBucketizedResource.java     |   4 +-
 .../integration/TestCMWithFailParticipant.java  |   4 +-
 .../integration/TestCarryOverBadCurState.java   |   6 +-
 .../integration/TestCleanupExternalView.java    |   5 +-
 .../helix/integration/TestClusterStartsup.java  |  22 +-
 .../TestCorrectnessOnConnectivityLoss.java      |  12 +-
 .../apache/helix/integration/TestDisable.java   |   4 +-
 .../integration/TestDisableExternalView.java    |   4 +-
 .../helix/integration/TestDisableResource.java  |   2 +
 .../integration/TestDistributedCMMain.java      |   4 +-
 .../TestDistributedClusterController.java       |   4 +-
 .../apache/helix/integration/TestDriver.java    |   5 +-
 .../org/apache/helix/integration/TestDrop.java  |   4 +-
 .../helix/integration/TestDropResource.java     |   8 +-
 .../integration/TestEnableCompression.java      |   4 +-
 .../TestEnablePartitionDuringDisable.java       |   4 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   1 +
 .../helix/integration/TestErrorPartition.java   |   4 +-
 .../helix/integration/TestExpandCluster.java    |  22 +-
 .../integration/TestExternalViewUpdates.java    |   4 +-
 .../integration/TestHelixCustomCodeRunner.java  |   6 +-
 .../helix/integration/TestHelixInstanceTag.java |  12 +-
 .../TestHelixUsingDifferentParams.java          |   4 +-
 .../TestInvalidResourceRebalance.java           |   1 +
 .../helix/integration/TestNullReplica.java      |   5 +-
 .../TestPartitionLevelTransitionConstraint.java |   5 +-
 .../TestPartitionMovementThrottle.java          |  36 +-
 .../helix/integration/TestPauseSignal.java      |   4 +-
 .../TestRebalancerPersistAssignments.java       |  43 +-
 .../TestReelectedPipelineCorrectness.java       |   1 +
 .../helix/integration/TestRenamePartition.java  |   4 +-
 .../helix/integration/TestResetInstance.java    |   4 +-
 .../integration/TestResetPartitionState.java    |   5 +-
 .../helix/integration/TestResetResource.java    |   5 +-
 .../integration/TestResourceGroupEndtoEnd.java  |   6 +-
 .../TestResourceWithSamePartitionKey.java       |   1 +
 .../helix/integration/TestSchemataSM.java       |   4 +-
 .../TestSessionExpiryInTransition.java          |   4 +-
 .../TestStandAloneCMSessionExpiry.java          |   4 +-
 ...estStartMultipleControllersWithSameName.java |   5 +-
 .../TestStateTransitionCancellation.java        |  23 +-
 .../TestStateTransitionThrottle.java            |   8 +-
 .../helix/integration/TestStatusUpdate.java     |   4 +-
 .../helix/integration/TestSwapInstance.java     |  13 +-
 .../TestSyncSessionToController.java            |   4 +-
 .../TestWeightBasedRebalanceUtil.java           |  16 +-
 .../helix/integration/TestZkConnectionLost.java |  43 +-
 .../integration/common/IntegrationTest.java     |  38 -
 .../common/ZkIntegrationTestBase.java           | 322 ---------
 .../common/ZkStandAloneCMTestBase.java          |  36 +-
 .../TestClusterDataCacheSelectiveUpdate.java    |  12 +-
 .../controller/TestClusterMaintenanceMode.java  |  17 +-
 .../TestControllerLeadershipChange.java         |   4 +-
 .../controller/TestControllerLiveLock.java      |   1 +
 .../TestSkipBestPossibleCalculation.java        |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestHelixDataAccessor.java          |   7 +-
 .../manager/TestParticipantManager.java         |   4 +-
 .../integration/messaging/TestBatchMessage.java |  15 +-
 .../messaging/TestBatchMessageWrapper.java      |   1 +
 .../messaging/TestMessageThrottle.java          |   4 +-
 .../messaging/TestMessageThrottle2.java         |   4 +-
 .../messaging/TestP2PMessageSemiAuto.java       |  19 +-
 .../paticipant/TestInstanceAutoJoin.java        |   4 +-
 .../paticipant/TestNonOfflineInitState.java     |  21 +-
 .../paticipant/TestRestartParticipant.java      |   5 +-
 .../paticipant/TestStateTransitionTimeout.java  |  18 +-
 .../TestStateTransitionTimeoutWithResource.java |  30 +-
 .../TestCrushAutoRebalance.java                 |  81 +--
 .../TestCrushAutoRebalanceNonRack.java          |  73 +-
 ...rushAutoRebalanceTopoplogyAwareDisabled.java |  20 +-
 .../TestDelayedAutoRebalance.java               |  48 +-
 ...elayedAutoRebalanceWithDisabledInstance.java |  54 +-
 .../TestDelayedAutoRebalanceWithRackaware.java  |  11 +-
 .../PartitionMigration/TestExpandCluster.java   |   6 +-
 .../TestFullAutoMigration.java                  |   4 +-
 .../TestPartitionMigrationBase.java             |  43 +-
 .../rebalancer/TestAutoIsWithEmptyMap.java      |   5 +-
 .../rebalancer/TestAutoRebalance.java           |  30 +-
 .../TestAutoRebalancePartitionLimit.java        |  16 +-
 .../TestAutoRebalanceWithDisabledInstance.java  |  18 +-
 ...MaintenanceModeWhenReachingMaxPartition.java |   9 +-
 ...ceModeWhenReachingOfflineInstancesLimit.java |  10 +-
 .../rebalancer/TestCustomIdealState.java        |   4 +-
 .../TestCustomizedIdealStateRebalancer.java     |   8 +-
 .../rebalancer/TestFullAutoNodeTagging.java     |   9 +
 .../rebalancer/TestMixedModeAutoRebalance.java  |   8 +-
 .../rebalancer/TestSemiAutoRebalance.java       |  24 +-
 .../rebalancer/TestZeroReplicaAvoidance.java    |  19 +-
 .../spectator/TestRoutingTableProvider.java     |   4 +-
 ...stRoutingTableProviderFromCurrentStates.java |  47 +-
 .../TestRoutingTableProviderFromTargetEV.java   |  36 +-
 ...TestRoutingTableProviderPeriodicRefresh.java |   4 +-
 .../spectator/TestRoutingTableSnapshot.java     |  26 +-
 .../helix/integration/task/TaskTestBase.java    |  11 +-
 .../integration/task/TestBatchAddJobs.java      |  13 +-
 .../task/TestIndependentTaskRebalancer.java     |  10 +-
 .../helix/integration/task/TestJobFailure.java  |  10 +-
 .../task/TestJobFailureDependence.java          |   8 +-
 .../task/TestJobFailureHighThreshold.java       |   9 +-
 .../task/TestJobFailureTaskNotStarted.java      |  20 +-
 .../helix/integration/task/TestJobTimeout.java  |   9 +-
 .../task/TestJobTimeoutTaskNotStarted.java      |  10 +-
 .../task/TestRebalanceRunningTask.java          |  10 +-
 .../task/TestRunJobsWithMissingTarget.java      |   4 +-
 .../integration/task/TestTaskAssignment.java    |   2 +-
 .../task/TestTaskRebalancerParallel.java        |   4 -
 .../integration/task/TestTaskThreadLeak.java    |   4 +-
 .../integration/task/TestTaskThrottling.java    |   6 +-
 .../task/TestTaskWithInstanceDisabled.java      |   2 +-
 .../integration/task/TestUserContentStore.java  |   9 +-
 .../helix/manager/zk/TestHandleNewSession.java  |   5 +-
 .../manager/zk/TestZkBaseDataAccessor.java      |  91 +--
 .../zk/TestZkManagerFlappingDetection.java      |   4 +-
 .../handling/TestBatchMessageModeConfigs.java   |  14 +-
 .../handling/TestResourceThreadpoolSize.java    |  24 +-
 .../TestClusterStatusMonitorLifecycle.java      |  23 +-
 .../mbeans/TestClusterAggregateMetrics.java     |   4 +-
 .../mbeans/TestDisableResourceMbean.java        |   1 +
 .../mbeans/TestDropResourceMetricsReset.java    |   1 +
 .../mbeans/TestResetClusterMetrics.java         |   1 +
 .../participant/TestDistControllerElection.java |  16 +-
 .../TestDistControllerStateModel.java           |   8 +
 .../helix/spectator/TestRoutingDataCache.java   |  14 +-
 .../helix/task/TaskSynchronizedTestBase.java    |  66 +-
 ...signableInstanceManagerControllerSwitch.java |   2 +-
 .../helix/task/TestJobStateOnCreation.java      |   8 +-
 .../helix/task/TestSemiAutoStateTransition.java |  12 +-
 .../apache/helix/tools/TestClusterSetup.java    |  11 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |   4 +-
 142 files changed, 1561 insertions(+), 1719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 4abfd07..fa5d29f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -82,18 +82,7 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
     _expectLiveInstances = expectLiveInstances;
     _clusterDataCache = new ClusterDataCache();
   }
-
-  public static void main (String [] args) {
-    Set<String> resources = Collections.singleton("SyncColoTestDB");
-    BestPossibleExternalViewVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder("ESPRESSO_MT1")
-            .setZkAddr("zk-ltx1-espresso.stg.linkedin.com:12913")
-            .setResources(resources)
-            .build();
-
-    verifier.verify();
-  }
-
+  
   public static class Builder {
     private String _clusterName;
     private Map<String, Map<String, String>> _errStates;

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java 
b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 483f0af..5757b24 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -19,445 +19,15 @@ package org.apache.helix;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
-import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.helix.util.ZKClientPool;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
+import org.apache.helix.common.ZkTestBase;
 
 // TODO merge code with ZkIntegrationTestBase
-public class ZkUnitTestBase {
-  private static Logger LOG = LoggerFactory.getLogger(ZkUnitTestBase.class);
-  protected static ZkServer _zkServer = null;
-  protected static ZkClient _gZkClient;
-
-  public static final String ZK_ADDR = "localhost:2185";
-  protected static final String CLUSTER_PREFIX = "CLUSTER";
-  protected static final String CONTROLLER_CLUSTER_PREFIX = 
"CONTROLLER_CLUSTER";
-
-  @BeforeSuite(alwaysRun = true)
-  public void beforeSuite() throws Exception {
-    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk 
commends
-    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 
"3000");
-
-    _zkServer = TestHelper.startZkServer(ZK_ADDR);
-    AssertJUnit.assertTrue(_zkServer != null);
-    ZKClientPool.reset();
-
-    // System.out.println("Number of open zkClient before ZkUnitTests: "
-    // + ZkClient.getNumberOfConnections());
-
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterSuite(alwaysRun = true)
-  public void afterSuite() {
-    _gZkClient.close();
-    TestHelper.stopZkServer(_zkServer);
-    _zkServer = null;
-
-    // System.out.println("Number of open zkClient after ZkUnitTests: "
-    // + ZkClient.getNumberOfConnections());
-
-  }
-
-  protected String getShortClassName() {
-    String className = this.getClass().getName();
-    return className.substring(className.lastIndexOf('.') + 1);
-  }
-
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
-    String leaderPath = PropertyPathBuilder.controllerLeader(clusterName);
-    ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
-    if (leaderRecord == null) {
-      return null;
-    }
-
-    String leader = 
leaderRecord.getSimpleField(PropertyType.LEADER.toString());
-    return leader;
-  }
-
-  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
-      Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
-    String leader = getCurrentLeader(zkClient, clusterName);
-    Assert.assertTrue(leader != null);
-    System.out.println("stop leader:" + leader + " in " + clusterName);
-    Assert.assertTrue(leader != null);
-
-    HelixManager manager = managerMap.remove(leader);
-    Assert.assertTrue(manager != null);
-    manager.disconnect();
-
-    Thread thread = threadMap.remove(leader);
-    Assert.assertTrue(thread != null);
-    thread.interrupt();
-
-    boolean isNewLeaderElected = false;
-    try {
-      // Thread.sleep(2000);
-      for (int i = 0; i < 5; i++) {
-        Thread.sleep(1000);
-        String newLeader = getCurrentLeader(zkClient, clusterName);
-        if (!newLeader.equals(leader)) {
-          isNewLeaderElected = true;
-          System.out.println("new leader elected: " + newLeader + " in " + 
clusterName);
-          break;
-        }
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    if (isNewLeaderElected == false) {
-      System.out.println("fail to elect a new leader elected in " + 
clusterName);
-    }
-    AssertJUnit.assertTrue(isNewLeaderElected);
-  }
-
-  public void verifyInstance(ZkClient zkClient, String clusterName, String 
instance,
-      boolean wantExists) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-    String instanceConfigsPath = 
PropertyPathBuilder.instanceConfig(clusterName);
-    String instanceConfigPath = instanceConfigsPath + "/" + instance;
-    String instancePath = PropertyPathBuilder.instance(clusterName, instance);
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
-  }
-
-  public void verifyResource(ZkClient zkClient, String clusterName, String 
resource,
-      boolean wantExists) {
-    String resourcePath = PropertyPathBuilder.idealState(clusterName, 
resource);
-    AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
-  }
-
-  public void verifyEnabled(ZkClient zkClient, String clusterName, String 
instance,
-      boolean wantEnabled) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instance));
-    AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
-  }
-
-  public void verifyReplication(ZkClient zkClient, String clusterName, String 
resource, int repl) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(resource));
-    for (String partitionName : idealState.getPartitionSet()) {
-      if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        AssertJUnit.assertEquals(repl, 
idealState.getPreferenceList(partitionName).size());
-      } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        AssertJUnit.assertEquals(repl, 
idealState.getInstanceStateMap(partitionName).size());
-      }
-    }
-  }
-
-  protected void simulateSessionExpiry(ZkConnection zkConnection) throws 
IOException,
-      InterruptedException {
-    ZooKeeper oldZookeeper = zkConnection.getZookeeper();
-    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
-
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        LOG.info("In New connection, process event:" + event);
-      }
-    };
-
-    ZooKeeper newZookeeper =
-        new ZooKeeper(zkConnection.getServers(), 
oldZookeeper.getSessionTimeout(), watcher,
-            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
-    LOG.info("New sessionId = " + newZookeeper.getSessionId());
-    // Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    oldZookeeper = zkConnection.getZookeeper();
-    LOG.info("After session expiry sessionId = " + 
oldZookeeper.getSessionId());
-  }
-
-  protected void simulateSessionExpiry(ZkClient zkClient) throws IOException, 
InterruptedException {
-    IZkStateListener listener = new IZkStateListener() {
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        LOG.info("In Old connection, state changed:" + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        LOG.info("In Old connection, new session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws 
Exception {
-      }
-    };
-    zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
-    ZooKeeper oldZookeeper = connection.getZookeeper();
-    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
-
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        LOG.info("In New connection, process event:" + event);
-      }
-    };
-
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), 
oldZookeeper.getSessionTimeout(), watcher,
-            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
-    LOG.info("New sessionId = " + newZookeeper.getSessionId());
-    // Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = (ZkConnection) zkClient.getConnection();
-    oldZookeeper = connection.getZookeeper();
-    LOG.info("After session expiry sessionId = " + 
oldZookeeper.getSessionId());
-  }
-
-  protected void setupStateModel(String clusterName) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    StateModelDefinition masterSlave =
-        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
-    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), 
masterSlave);
-
-    StateModelDefinition leaderStandby =
-        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
-    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), 
leaderStandby);
-
-    StateModelDefinition onlineOffline =
-        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
-    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), 
onlineOffline);
-
-  }
-
-  protected List<IdealState> setupIdealState(String clusterName, int[] nodes, 
String[] resources,
-      int partitions, int replicas) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    List<IdealState> idealStates = new ArrayList<IdealState>();
-    List<String> instances = new ArrayList<String>();
-    for (int i : nodes) {
-      instances.add("localhost_" + i);
-    }
-
-    for (String resourceName : resources) {
-      IdealState idealState = new IdealState(resourceName);
-      for (int p = 0; p < partitions; p++) {
-        List<String> value = new ArrayList<String>();
-        for (int r = 0; r < replicas; r++) {
-          int n = nodes[(p + r) % nodes.length];
-          value.add("localhost_" + n);
-        }
-        idealState.getRecord().setListField(resourceName + "_" + p, value);
-      }
-
-      idealState.setReplicas(Integer.toString(replicas));
-      idealState.setStateModelDefRef("MasterSlave");
-      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
-      idealState.setNumPartitions(partitions);
-      idealStates.add(idealState);
-
-      // System.out.println(idealState);
-      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
-    }
-    return idealStates;
-  }
-
-  protected void setupLiveInstances(String clusterName, int[] liveInstances) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    for (int i = 0; i < liveInstances.length; i++) {
-      String instance = "localhost_" + liveInstances[i];
-      LiveInstance liveInstance = new LiveInstance(instance);
-      liveInstance.setSessionId("session_" + liveInstances[i]);
-      liveInstance.setHelixVersion("0.0.0");
-      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
-    }
-  }
-
-  protected void setupInstances(String clusterName, int[] instances) {
-    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
-    for (int i = 0; i < instances.length; i++) {
-      String instance = "localhost_" + instances[i];
-      InstanceConfig instanceConfig = new InstanceConfig(instance);
-      instanceConfig.setHostName("localhost");
-      instanceConfig.setPort("" + instances[i]);
-      instanceConfig.setInstanceEnabled(true);
-      admin.addInstance(clusterName, instanceConfig);
-    }
-  }
-
-  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
-    try {
-      pipeline.handle(event);
-      pipeline.finish();
-    } catch (Exception e) {
-      LOG.error("Exception while executing pipeline:" + pipeline
-          + ". Will not continue to next pipeline", e);
-    }
-  }
-
-  protected void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-
-    // AbstractAsyncBaseStage will run asynchronously, and it's main logics 
are implemented in
-    // execute() function call
-    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving 
runStage()
-    // to a shared library
-    if (stage instanceof AbstractAsyncBaseStage) {
-      ((AbstractAsyncBaseStage) stage).execute(event);
-    } else {
-      stage.process(event);
-    }
-    stage.postProcess();
-  }
-
-  protected Message createMessage(MessageType type, String msgId, String 
fromState, String toState,
-      String resourceName, String tgtName) {
-    Message msg = new Message(type.toString(), msgId);
-    msg.setFromState(fromState);
-    msg.setToState(toState);
-    msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), 
resourceName);
-    msg.setTgtName(tgtName);
-    return msg;
-  }
-
-  /**
-   * Poll for the existence (or lack thereof) of a specific Helix property
-   * @param clazz the HelixProeprty subclass
-   * @param accessor connected HelixDataAccessor
-   * @param key the property key to look up
-   * @param shouldExist true if the property should exist, false otherwise
-   * @return the property if found, or null if it does not exist
-   */
-  protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, 
HelixDataAccessor accessor,
-      PropertyKey key, boolean shouldExist) throws InterruptedException {
-    final int POLL_TIMEOUT = 5000;
-    final int POLL_INTERVAL = 50;
-    T property = accessor.getProperty(key);
-    int timeWaited = 0;
-    while (((shouldExist && property == null) || (!shouldExist && property != 
null))
-        && timeWaited < POLL_TIMEOUT) {
-      Thread.sleep(POLL_INTERVAL);
-      timeWaited += POLL_INTERVAL;
-      property = accessor.getProperty(key);
-    }
-    return property;
-  }
-
-  /**
-   * Ensures that external view and current state are empty
-   */
-  protected static class EmptyZkVerifier implements ZkVerifier {
-    private final String _clusterName;
-    private final String _resourceName;
-    private final ZkClient _zkClient;
-
-    /**
-     * Instantiate the verifier
-     * @param clusterName the cluster to verify
-     * @param resourceName the resource to verify
-     */
-    public EmptyZkVerifier(String clusterName, String resourceName) {
-      _clusterName = clusterName;
-      _resourceName = resourceName;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
-    }
-
-    @Override
-    public boolean verify() {
-      BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
-      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, 
baseAccessor);
-      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-      ExternalView externalView = 
accessor.getProperty(keyBuilder.externalView(_resourceName));
-
-      // verify external view empty
-      if (externalView != null) {
-        for (String partition : externalView.getPartitionSet()) {
-          Map<String, String> stateMap = externalView.getStateMap(partition);
-          if (stateMap != null && !stateMap.isEmpty()) {
-            LOG.error("External view not empty for " + partition);
-            return false;
-          }
-        }
-      }
-
-      // verify current state empty
-      List<String> liveParticipants = 
accessor.getChildNames(keyBuilder.liveInstances());
-      for (String participant : liveParticipants) {
-        List<String> sessionIds = 
accessor.getChildNames(keyBuilder.sessions(participant));
-        for (String sessionId : sessionIds) {
-          CurrentState currentState =
-              accessor.getProperty(keyBuilder.currentState(participant, 
sessionId, _resourceName));
-          Map<String, String> partitionStateMap = 
currentState.getPartitionStateMap();
-          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
-            LOG.error("Current state not empty for " + participant);
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public ZkClient getZkClient() {
-      return _zkClient;
-    }
+public class ZkUnitTestBase extends ZkTestBase {
 
-    @Override
-    public String getClusterName() {
-      return _clusterName;
+  protected void deleteCluster(String clusterName) {
+    String namespace = "/" + clusterName;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
new file mode 100644
index 0000000..7d98119
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -0,0 +1,691 @@
+package org.apache.helix.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeSuite;
+
+public class ZkTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
+
+  protected static ZkServer _zkServer;
+  protected static ZkClient _gZkClient;
+  protected static ClusterSetup _gSetupTool;
+  protected static BaseDataAccessor<ZNRecord> _baseAccessor;
+
+  public static final String ZK_ADDR = "localhost:2183";
+  protected static final String CLUSTER_PREFIX = "CLUSTER";
+  protected static final String CONTROLLER_CLUSTER_PREFIX = 
"CONTROLLER_CLUSTER";
+  protected final String CONTROLLER_PREFIX = "controller";
+  protected final String PARTICIPANT_PREFIX = "localhost";
+
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception {
+    // TODO: use logging.properties file to config java.util.logging.Logger 
levels
+    java.util.logging.Logger topJavaLogger = 
java.util.logging.Logger.getLogger("");
+    topJavaLogger.setLevel(Level.WARNING);
+
+    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk 
commends
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 
"3000");
+
+    _zkServer = TestHelper.startZkServer(ZK_ADDR);
+    AssertJUnit.assertTrue(_zkServer != null);
+    ZKClientPool.reset();
+
+    _gZkClient = new ZkClient(ZK_ADDR);
+    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+    _gSetupTool = new ClusterSetup(_gZkClient);
+    _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
+  }
+
+  @AfterSuite
+  public void afterSuite() {
+    ZKClientPool.reset();
+    _gZkClient.close();
+    TestHelper.stopZkServer(_zkServer);
+  }
+
+  @BeforeMethod
+  public void beforeTest(Method testMethod, ITestContext testContext){
+    long startTime = System.currentTimeMillis();
+    System.out.println("START " + testMethod.getName() + " at " + new 
Date(startTime));
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+  }
+
+  @AfterMethod
+  public void endTest(Method testMethod, ITestContext testContext) {
+    Long startTime = (Long) testContext.getAttribute("StartTime");
+    long endTime = System.currentTimeMillis();
+    System.out.println(
+        "END " + testMethod.getName() + " at " + new Date(endTime) + ", took: 
" + (endTime
+            - startTime) + "ms.");
+  }
+
+
+  protected String getShortClassName() {
+    return this.getClass().getSimpleName();
+  }
+
+  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null) {
+      return null;
+    }
+    return leader.getInstanceName();
+  }
+
+  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
+      Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
+    String leader = getCurrentLeader(zkClient, clusterName);
+    Assert.assertTrue(leader != null);
+    System.out.println("stop leader:" + leader + " in " + clusterName);
+    Assert.assertTrue(leader != null);
+
+    HelixManager manager = managerMap.remove(leader);
+    Assert.assertTrue(manager != null);
+    manager.disconnect();
+
+    Thread thread = threadMap.remove(leader);
+    Assert.assertTrue(thread != null);
+    thread.interrupt();
+
+    boolean isNewLeaderElected = false;
+    try {
+      // Thread.sleep(2000);
+      for (int i = 0; i < 5; i++) {
+        Thread.sleep(1000);
+        String newLeader = getCurrentLeader(zkClient, clusterName);
+        if (!newLeader.equals(leader)) {
+          isNewLeaderElected = true;
+          System.out.println("new leader elected: " + newLeader + " in " + 
clusterName);
+          break;
+        }
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    if (isNewLeaderElected == false) {
+      System.out.println("fail to elect a new leader elected in " + 
clusterName);
+    }
+    AssertJUnit.assertTrue(isNewLeaderElected);
+  }
+
+  protected void enableHealthCheck(String clusterName) {
+    ConfigScope scope = new 
ConfigScopeBuilder().forCluster(clusterName).build();
+    new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" 
+ true);
+  }
+
+  protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String 
clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistBestPossibleAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enablePersistIntermediateAssignment(ZkClient zkClient, String 
clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistIntermediateAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableTopologyAwareRebalance(ZkClient zkClient, String 
clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setTopologyAwareEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInCluster(ZkClient zkClient, String 
clusterName,
+      boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setDelayRebalaceEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInInstance(ZkClient zkClient, String 
clusterName,
+      String instanceName, boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+    instanceConfig.setDelayRebalanceEnabled(enabled);
+    configAccessor.setInstanceConfig(clusterName, instanceName, 
instanceConfig);
+  }
+
+  protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, 
long delay) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setRebalanceDelayTime(delay);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, 
numPartition, replica,
+        minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay,
+      String rebalanceStrategy) {
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    if (idealState == null) {
+      _gSetupTool.addResourceToCluster(clusterName, db, numPartition, 
stateModel,
+          IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+    }
+
+    idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    idealState.setMinActiveReplicas(minActiveReplica);
+    if (!idealState.isDelayRebalanceEnabled()) {
+      idealState.setDelayRebalanceEnabled(true);
+    }
+    if (delay > 0) {
+      idealState.setRebalanceDelay(delay);
+    }
+    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, 
db, idealState);
+    _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
+    idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+    return idealState;
+  }
+
+  protected IdealState createIdealState(String resourceGroupName, String 
instanceGroupTag,
+      List<String> instanceNames, int numPartition, int replica, String 
rebalanceMode,
+      String stateModelDef) {
+    IdealState is = _gSetupTool
+        .createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, 
numPartition,
+            replica, rebalanceMode, stateModelDef);
+
+    // setup initial partition->instance mapping.
+    int nodeIdx = 0;
+    int numNode = instanceNames.size();
+    assert (numNode >= replica);
+    for (int i = 0; i < numPartition; i++) {
+      String partitionName = resourceGroupName + "_" + i;
+      for (int j = 0; j < replica; j++) {
+        is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % 
numNode),
+            OnlineOfflineSMD.States.ONLINE.toString());
+      }
+      nodeIdx++;
+    }
+
+    return is;
+  }
+
+  protected void createDBInSemiAuto(ClusterSetup clusterSetup, String 
clusterName, String dbName,
+      List<String> preferenceList, String stateModelDef, int numPartition, int 
replica) {
+    clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, 
stateModelDef,
+        IdealState.RebalanceMode.SEMI_AUTO.toString());
+    clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica);
+
+    IdealState is =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, 
dbName);
+    for (String p : is.getPartitionSet()) {
+      is.setPreferenceList(p, preferenceList);
+    }
+    clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, 
dbName, is);
+  }
+
+  /**
+   * Validate there should be always minimal active replica and top state 
replica for each partition.
+   * Also make sure there is always some partitions with only active replica 
count.
+   */
+  protected void validateMinActiveAndTopStateReplica(IdealState is, 
ExternalView ev,
+      int minActiveReplica, int numNodes) {
+    StateModelDefinition stateModelDef =
+        
BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+    String topState = stateModelDef.getStatesPriorityList().get(0);
+    int replica = Integer.valueOf(is.getReplicas());
+
+    Map<String, Integer> stateCount =
+        stateModelDef.getStateCountMap(numNodes, replica);
+    Set<String> activeStates = stateCount.keySet();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for 
partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best 
possible map in IS.");
+
+      boolean hasTopState = false;
+      int activeReplica = 0;
+      for (String state : assignmentMap.values()) {
+        if (topState.equalsIgnoreCase(state)) {
+          hasTopState = true;
+        }
+        if (activeStates.contains(state)) {
+          activeReplica++;
+        }
+      }
+
+      if (activeReplica < minActiveReplica) {
+        int a = 0;
+      }
+
+      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", 
partition, topState));
+      Assert.assertTrue(activeReplica >= minActiveReplica, String
+          .format("%s has less active replica %d then required %d", partition, 
activeReplica,
+              minActiveReplica));
+    }
+  }
+
+  protected void runStage(HelixManager manager, ClusterEvent event, Stage 
stage) throws Exception {
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics 
are implemented in
+    // execute() function call
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  public void verifyInstance(ZkClient zkClient, String clusterName, String 
instance,
+      boolean wantExists) {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+    String instanceConfigsPath = 
PropertyPathBuilder.instanceConfig(clusterName);
+    String instanceConfigPath = instanceConfigsPath + "/" + instance;
+    String instancePath = PropertyPathBuilder.instance(clusterName, instance);
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
+  }
+
+  public void verifyResource(ZkClient zkClient, String clusterName, String 
resource,
+      boolean wantExists) {
+    String resourcePath = PropertyPathBuilder.idealState(clusterName, 
resource);
+    AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
+  }
+
+  public void verifyEnabled(ZkClient zkClient, String clusterName, String 
instance,
+      boolean wantEnabled) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instance));
+    AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
+  }
+
+  public void verifyReplication(ZkClient zkClient, String clusterName, String 
resource, int repl) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(resource));
+    for (String partitionName : idealState.getPartitionSet()) {
+      if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) 
{
+        AssertJUnit.assertEquals(repl, 
idealState.getPreferenceList(partitionName).size());
+      } else if (idealState.getRebalanceMode() == 
IdealState.RebalanceMode.CUSTOMIZED) {
+        AssertJUnit.assertEquals(repl, 
idealState.getInstanceStateMap(partitionName).size());
+      }
+    }
+  }
+
+  protected void simulateSessionExpiry(ZkConnection zkConnection) throws 
IOException,
+      InterruptedException {
+    ZooKeeper oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(zkConnection.getServers(), 
oldZookeeper.getSessionTimeout(), watcher,
+            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    oldZookeeper = zkConnection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + 
oldZookeeper.getSessionId());
+  }
+
+  protected void simulateSessionExpiry(ZkClient zkClient)
+      throws IOException, InterruptedException, IOException {
+    IZkStateListener listener = new IZkStateListener() {
+      @Override
+      public void handleStateChanged(Watcher.Event.KeeperState state) throws 
Exception {
+        LOG.info("In Old connection, state changed:" + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        LOG.info("In Old connection, new session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws 
Exception {
+      }
+    };
+    zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper oldZookeeper = connection.getZookeeper();
+    LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        LOG.info("In New connection, process event:" + event);
+      }
+    };
+
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), 
oldZookeeper.getSessionTimeout(), watcher,
+            oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd());
+    LOG.info("New sessionId = " + newZookeeper.getSessionId());
+    // Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = (ZkConnection) zkClient.getConnection();
+    oldZookeeper = connection.getZookeeper();
+    LOG.info("After session expiry sessionId = " + 
oldZookeeper.getSessionId());
+  }
+
+  protected void setupStateModel(String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    StateModelDefinition masterSlave =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), 
masterSlave);
+
+    StateModelDefinition leaderStandby =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), 
leaderStandby);
+
+    StateModelDefinition onlineOffline =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), 
onlineOffline);
+
+  }
+
+  protected Message createMessage(Message.MessageType type, String msgId, 
String fromState, String toState,
+      String resourceName, String tgtName) {
+    Message msg = new Message(type.toString(), msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    
msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(), 
resourceName);
+    msg.setTgtName(tgtName);
+    return msg;
+  }
+
+  protected List<IdealState> setupIdealState(String clusterName, int[] nodes, 
String[] resources,
+      int partitions, int replicas) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    List<IdealState> idealStates = new ArrayList<IdealState>();
+    List<String> instances = new ArrayList<String>();
+    for (int i : nodes) {
+      instances.add("localhost_" + i);
+    }
+
+    for (String resourceName : resources) {
+      IdealState idealState = new IdealState(resourceName);
+      for (int p = 0; p < partitions; p++) {
+        List<String> value = new ArrayList<String>();
+        for (int r = 0; r < replicas; r++) {
+          int n = nodes[(p + r) % nodes.length];
+          value.add("localhost_" + n);
+        }
+        idealState.getRecord().setListField(resourceName + "_" + p, value);
+      }
+
+      idealState.setReplicas(Integer.toString(replicas));
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(partitions);
+      idealStates.add(idealState);
+
+      // System.out.println(idealState);
+      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    }
+    return idealStates;
+  }
+
+  protected void setupLiveInstances(String clusterName, int[] liveInstances) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    for (int i = 0; i < liveInstances.length; i++) {
+      String instance = "localhost_" + liveInstances[i];
+      LiveInstance liveInstance = new LiveInstance(instance);
+      liveInstance.setSessionId("session_" + liveInstances[i]);
+      liveInstance.setHelixVersion("0.0.0");
+      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+    }
+  }
+
+  protected void setupInstances(String clusterName, int[] instances) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    for (int i = 0; i < instances.length; i++) {
+      String instance = "localhost_" + instances[i];
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort("" + instances[i]);
+      instanceConfig.setInstanceEnabled(true);
+      admin.addInstance(clusterName, instanceConfig);
+    }
+  }
+
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+    try {
+      pipeline.handle(event);
+      pipeline.finish();
+    } catch (Exception e) {
+      LOG.error("Exception while executing pipeline:" + pipeline
+          + ". Will not continue to next pipeline", e);
+    }
+  }
+
+  protected void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics 
are implemented in
+    // execute() function call
+    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving 
runStage()
+    // to a shared library
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  /**
+   * Poll for the existence (or lack thereof) of a specific Helix property
+   * @param clazz the HelixProeprty subclass
+   * @param accessor connected HelixDataAccessor
+   * @param key the property key to look up
+   * @param shouldExist true if the property should exist, false otherwise
+   * @return the property if found, or null if it does not exist
+   */
+  protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, 
HelixDataAccessor accessor,
+      PropertyKey key, boolean shouldExist) throws InterruptedException {
+    final int POLL_TIMEOUT = 5000;
+    final int POLL_INTERVAL = 50;
+    T property = accessor.getProperty(key);
+    int timeWaited = 0;
+    while (((shouldExist && property == null) || (!shouldExist && property != 
null))
+        && timeWaited < POLL_TIMEOUT) {
+      Thread.sleep(POLL_INTERVAL);
+      timeWaited += POLL_INTERVAL;
+      property = accessor.getProperty(key);
+    }
+    return property;
+  }
+
+  /**
+   * Ensures that external view and current state are empty
+   */
+  protected static class EmptyZkVerifier implements 
ClusterStateVerifier.ZkVerifier {
+    private final String _clusterName;
+    private final String _resourceName;
+    private final ZkClient _zkClient;
+
+    /**
+     * Instantiate the verifier
+     * @param clusterName the cluster to verify
+     * @param resourceName the resource to verify
+     */
+    public EmptyZkVerifier(String clusterName, String resourceName) {
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+    }
+
+    @Override
+    public boolean verify() {
+      BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, 
baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      ExternalView externalView = 
accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+      // verify external view empty
+      if (externalView != null) {
+        for (String partition : externalView.getPartitionSet()) {
+          Map<String, String> stateMap = externalView.getStateMap(partition);
+          if (stateMap != null && !stateMap.isEmpty()) {
+            LOG.error("External view not empty for " + partition);
+            return false;
+          }
+        }
+      }
+
+      // verify current state empty
+      List<String> liveParticipants = 
accessor.getChildNames(keyBuilder.liveInstances());
+      for (String participant : liveParticipants) {
+        List<String> sessionIds = 
accessor.getChildNames(keyBuilder.sessions(participant));
+        for (String sessionId : sessionIds) {
+          CurrentState currentState =
+              accessor.getProperty(keyBuilder.currentState(participant, 
sessionId, _resourceName));
+          Map<String, String> partitionStateMap = 
currentState.getPartitionStateMap();
+          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+            LOG.error("Current state not empty for " + participant);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return _clusterName;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 9054a1d..ab62676 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -119,8 +119,8 @@ public class TestMessageThrottleStage extends 
ZkUnitTestBase {
     Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new 
Partition("TestDB_0")).size(),
         1);
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   @Test()
@@ -310,8 +310,8 @@ public class TestMessageThrottleStage extends 
ZkUnitTestBase {
     Assert.assertTrue(throttleMessages.contains(msg3));
     Assert.assertTrue(throttleMessages.contains(msg4));
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   private boolean containsConstraint(Set<ConstraintItem> constraints, 
ConstraintItem constraint) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d38907d..90363c1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -118,8 +118,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = msgSelOutput.getMessages(resourceName, new 
Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: 
SLAVE-MASTER for node1");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -227,8 +227,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
     Assert.assertTrue(messages.isEmpty());
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -323,6 +323,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Assert.assertEquals(message.getToState(), "DROPPED");
     Assert.assertEquals(message.getTgtName(), "localhost_0");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 
@@ -398,8 +399,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messages = msgSelOutput.getMessages(resourceName, new 
Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: 
SLAVE-MASTER for node0");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
-
   }
 
   @Test
@@ -464,6 +465,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Assert.assertEquals(message.getToState(), "SLAVE");
     Assert.assertEquals(message.getTgtName(), "localhost_1");
 
+    deleteCluster(clusterName);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
 
b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
index 5701211..00509b3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
@@ -25,7 +25,7 @@ import java.util.Date;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
  * which helps us write integration tests easily
  */
 
-public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase {
+public class SinglePartitionLeaderStandByTest extends ZkTestBase {
   @Test
   public void test()
       throws Exception {
@@ -99,6 +99,7 @@ public class SinglePartitionLeaderStandByTest extends 
ZkIntegrationTestBase {
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
+    TestHelper.dropCluster(clusterName, _gZkClient);
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 8be880d..6aca358 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -21,7 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
@@ -33,7 +33,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAddClusterV2 extends ZkIntegrationTestBase {
+public class TestAddClusterV2 extends ZkTestBase {
   private static Logger LOG = LoggerFactory.getLogger(TestAddClusterV2.class);
 
   protected static final int CLUSTER_NR = 10;
@@ -54,18 +54,6 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CONTROLLER_CLUSTER;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-
-    for (int i = 0; i < CLUSTER_NR; i++) {
-      namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
-      if (_gZkClient.exists(namespace)) {
-        _gZkClient.deleteRecursively(namespace);
-      }
-    }
-
     _setupTool = new ClusterSetup(ZK_ADDR);
 
     // setup CONTROLLER_CLUSTER
@@ -136,6 +124,13 @@ public class TestAddClusterV2 extends 
ZkIntegrationTestBase {
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].syncStop();
     }
+
+    // delete clusters
+    for (int i = 0; i < CLUSTER_NR; i++) {
+      String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+      _setupTool.getClusterManagementTool().dropCluster(clusterName);
+    }
+
     System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index 7e83be2..1d5273a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
+public class TestAddNodeAfterControllerStart extends ZkTestBase {
   private static Logger LOG = 
LoggerFactory.getLogger(TestAddNodeAfterControllerStart.class);
   final String className = getShortClassName();
 
@@ -88,6 +88,7 @@ public class TestAddNodeAfterControllerStart extends 
ZkIntegrationTestBase {
     for (int i = 0; i < nodeNr; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }
@@ -164,6 +165,7 @@ public class TestAddNodeAfterControllerStart extends 
ZkIntegrationTestBase {
     for (int i = 0; i < nodeNr; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index b05b9fb..6209815 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -39,7 +39,7 @@ import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestAddStateModelFactoryAfterConnect extends 
ZkIntegrationTestBase {
+public class TestAddStateModelFactoryAfterConnect extends ZkTestBase {
   @Test
   public void testBasic() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -130,6 +130,7 @@ public class TestAddStateModelFactoryAfterConnect extends 
ZkIntegrationTestBase
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index 946eb1c..cd53552 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -68,17 +68,12 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
     // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
 
     for (int i = 0; i < NODE_NR; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
 
     // start controller
@@ -108,19 +103,19 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
 
   @Test (enabled = false)
   public void testParticipantUnavailable() {
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
     HelixClusterVerifier verifier =
         new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(new 
HashSet<>(Collections.singleton(testDb))).build();
     Assert.assertTrue(verifier.verify());
 
     // disable then enable the resource to ensure no rebalancing error is 
generated during this process
-    _setupTool.dropResourceFromCluster(CLUSTER_NAME, testDb);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, testDb);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
     Assert.assertTrue(verifier.verify());
 
     // Verify there is no rebalance error logged
@@ -137,7 +132,7 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     checkRebalanceFailureGauge(true);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i] =
           new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
_participants[i].getInstanceName());
@@ -147,21 +142,21 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
 
   @Test (enabled = false)
   public void testTagSetIncorrect() {
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name());
     // set expected instance tag
     IdealState is =
-        
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
testDb);
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
testDb);
     is.setInstanceGroupTag("RandomTag");
-    _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
testDb, is);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
testDb, is);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
     // Verify there is rebalance error logged
     Assert.assertNotNull(pollForError(accessor, errorNodeKey));
     checkRebalanceFailureGauge(true);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
   }
 
   @Test (enabled = false)
@@ -188,10 +183,10 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     // Error may be recorded unexpectedly when a resource from other tests is 
not cleaned up.
     accessor.removeProperty(errorNodeKey);
 
-    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO.name(),
         CrushRebalanceStrategy.class.getName());
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
     HelixClusterVerifier verifier =
         new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(new 
HashSet<>(Collections.singleton(testDb))).build();
@@ -212,13 +207,13 @@ public class TestAlertingRebalancerFailure extends 
ZkStandAloneCMTestBase {
     for (int i = replicas; i < NODE_NR; i++) {
       setDomainId(_participants[i].getInstanceName(), configAccessor);
     }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
     Thread.sleep(1000);
     // Verify that rebalance error state is removed
     checkRebalanceFailureGauge(false);
 
     // clean up
-    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
     clusterConfig.setTopologyAwareEnabled(false);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
index 0c61b27..2fa8c9f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
@@ -46,8 +46,8 @@ public class TestBasicSpectator extends 
ZkStandAloneCMTestBase implements
     relayHelixManager.connect();
     relayHelixManager.addExternalViewChangeListener(this);
 
-    _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
 
     boolean result =
         ClusterStateVerifier.verifyByPolling(new 
ClusterStateVerifier.BestPossAndExtViewZkVerifier(

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
index 3feb2e0..d7c3a2d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java
@@ -54,15 +54,15 @@ public class TestBatchMessageHandling extends 
ZkStandAloneCMTestBase {
     IdealState idealState = new 
FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline")
         
.setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
     idealState.setBatchMessageMode(true);
-    _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, 
idealState);
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, 
idealState);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
 
     Thread.sleep(1000L);
 
     int numOfOnlines = 0;
     int numOfErrors = 0;
     ExternalView externalView =
-        
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
+        
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
     for (String partition : externalView.getPartitionSet()) {
       if (externalView.getStateMap(partition).values().contains("ONLINE")) {
         numOfOnlines++;

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 1b764d4..7c90e5f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -30,7 +30,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -45,7 +45,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestBucketizedResource extends ZkIntegrationTestBase {
+public class TestBucketizedResource extends ZkTestBase {
 
   private void setupCluster(String clusterName, List<String> instanceNames, 
String dbName,
       int replica, int partitions, int bucketSize) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
index 3c3f4ac..7dd477a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
@@ -21,10 +21,10 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.testng.annotations.Test;
 
-public class TestCMWithFailParticipant extends ZkIntegrationTestBase {
+public class TestCMWithFailParticipant extends ZkTestBase {
   // ZkClient _zkClient;
   //
   // @BeforeClass ()

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
index 05b3e99..e01afc9 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -24,7 +24,7 @@ import java.util.Date;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,7 +33,7 @@ import 
org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
+public class TestCarryOverBadCurState extends ZkTestBase {
   @Test
   public void testCarryOverBadCurState() throws Exception {
     System.out.println("START testCarryOverBadCurState at " + new 
Date(System.currentTimeMillis()));
@@ -83,7 +83,7 @@ public class TestCarryOverBadCurState extends 
ZkIntegrationTestBase {
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
+    _gSetupTool.deleteCluster(clusterName);
     System.out.println("END testCarryOverBadCurState at " + new 
Date(System.currentTimeMillis()));
-
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
index e7a7ea0..61aadde 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
@@ -98,8 +98,8 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
     accessor.removeProperty(keyBuilder.currentState("localhost_12918", 
liveInstance.getSessionId(),
         "TestDB0"));
     liveInstance = 
accessor.getProperty(keyBuilder.liveInstance("localhost_12919"));
-    accessor.removeProperty(keyBuilder.currentState("localhost_12919", 
liveInstance.getSessionId(),
-        "TestDB0"));
+    accessor.removeProperty(
+        keyBuilder.currentState("localhost_12919", 
liveInstance.getSessionId(), "TestDB0"));
 
     // re-enable controller shall remove orphan external-view
     // System.out.println("re-enabling controller");
@@ -123,6 +123,7 @@ public class TestCleanupExternalView extends ZkUnitTestBase 
{
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
+    TestHelper.dropCluster(clusterName, _gZkClient);
 
     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
index 21a9ac2..9d5ad3e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -38,20 +38,14 @@ public class TestClusterStartsup extends 
ZkStandAloneCMTestBase {
   void setupCluster() throws HelixException {
     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-    _setupTool = new ClusterSetup(ZK_ADDR);
-
     // setup storage cluster
-    _setupTool.addCluster(CLUSTER_NAME, true);
-    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
     for (int i = 0; i < NODE_NR; i++) {
       String storageNodeName = "localhost_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
-    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
   }
 
   @Override
@@ -60,11 +54,6 @@ public class TestClusterStartsup extends 
ZkStandAloneCMTestBase {
 
   }
 
-  @Override
-  @AfterClass()
-  public void afterClass() {
-  }
-
   @Test()
   public void testParticipantStartUp() throws Exception {
     setupCluster();
@@ -135,5 +124,8 @@ public class TestClusterStartsup extends 
ZkStandAloneCMTestBase {
       AssertJUnit.assertFalse(manager.isConnected());
     }
 
+    if (manager != null) {
+      manager.disconnect();
+    }
   }
 }

Reply via email to