This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0c6187ebd7fd8bb2889b15f36b19f41dec300785
Author: Junkai Xue <junkai....@gmail.com>
AuthorDate: Mon Jul 22 13:27:43 2024 -0700

    Build HelixGateway integration test base (#2842)
    
    This test base contains all the setups including:
    1. Mock participants which hybrid with gateway services.
    2. Controller with start/stop.
    3. Resource creation.
    4. Only Online/Offline state model.
    5. Gateway services with start/stop (now leave empty until gateway service 
is implemented).
    
    Also start a dummy test to extend test base to make sure all set ups are 
ready except gateway services.
---
 helix-gateway/src/test/conf/testng.xml             |   2 +-
 .../java/org/apache/helix/gateway/DummyTest.java   |  21 +
 .../helix/gateway/base/HelixGatewayTestBase.java   | 141 ++++
 .../helix/gateway/base/ZookeeperTestBase.java      | 814 ++++++++++++++++++++
 .../base/manager/ClusterControllerManager.java     |  49 ++
 .../helix/gateway/base/manager/ClusterManager.java | 135 ++++
 .../base/manager/MockParticipantManager.java       | 118 +++
 .../base/statemodel/MockOFModelFactory.java        |  54 ++
 .../gateway/base/statemodel/MockOFStateModel.java  |  65 ++
 .../gateway/base/statemodel/MockTransition.java    |  41 ++
 .../apache/helix/gateway/base/util/TestHelper.java | 817 +++++++++++++++++++++
 11 files changed, 2256 insertions(+), 1 deletion(-)

diff --git a/helix-gateway/src/test/conf/testng.xml 
b/helix-gateway/src/test/conf/testng.xml
index 19446d49b..f77eab885 100644
--- a/helix-gateway/src/test/conf/testng.xml
+++ b/helix-gateway/src/test/conf/testng.xml
@@ -21,7 +21,7 @@
 <suite name="Suite" parallel="false">
   <test name="Test" preserve-order="true">
     <packages>
-      <package name="org.apache.helix.helix.gateway.*"/>
+      <package name="org.apache.helix.gateway.*"/>
     </packages>
   </test>
 </suite>
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java 
b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
new file mode 100644
index 000000000..7511cb3a3
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
@@ -0,0 +1,21 @@
+package org.apache.helix.gateway;
+
+import org.apache.helix.gateway.base.HelixGatewayTestBase;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DummyTest extends HelixGatewayTestBase {
+
+  @BeforeClass
+  public void beforeClass() {
+    _numParticipants = 5;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testSetups() {
+    createResource("TestDB_1", 4, 2);
+    Assert.assertTrue(_clusterVerifier.verify());
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
new file mode 100644
index 000000000..06cf45cd1
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
@@ -0,0 +1,141 @@
+package org.apache.helix.gateway.base;
+
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.gateway.base.manager.ClusterControllerManager;
+import org.apache.helix.gateway.base.manager.MockParticipantManager;
+import org.apache.helix.gateway.base.util.TestHelper;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.collections.Lists;
+
+public class HelixGatewayTestBase extends ZookeeperTestBase {
+  protected static final int START_PORT = 12918;
+  protected String _clusterName = "TEST_CLUSTER"; // change the cluster name 
for each test class
+  protected int _numParticipants = 3;
+  protected int _numGatewayInstances = 3;
+  protected ClusterControllerManager _controller;
+  protected List<MockParticipantManager> _participants;
+  protected ConfigAccessor _configAccessor;
+  protected BestPossibleExternalViewVerifier _clusterVerifier;
+
+  @BeforeClass
+  public void beforeClass() {
+    _participants = Lists.newArrayList();
+    _configAccessor = new ConfigAccessor(ZK_ADDR);
+    _gSetupTool.getClusterManagementTool().addCluster(_clusterName, true);
+    controllerManagement(true);
+    startParticipants();
+    startGatewayService();
+  }
+
+  @AfterClass
+  public void afterClass() {
+    controllerManagement(false);
+    stopParticipants(true);
+    stopGatewayService();
+    _gSetupTool.getClusterManagementTool().dropCluster(_clusterName);
+  }
+
+
+  /**
+   * Start or stop the controller
+   * @param start true to start the controller, false to stop the controller
+   */
+  private void controllerManagement(boolean start) {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+
+    if (start) {
+      _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, 
controllerName);
+      _controller.syncStart();
+
+      _clusterVerifier =
+          new 
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).setWaitTillVerify(
+              TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+    } else {
+      _controller.syncStop();
+    }
+
+    enablePersistBestPossibleAssignment(_gZkClient, _clusterName, start);
+  }
+
+  /**
+   * Create participants with the given number of participants defined by 
_numParticipants
+   */
+  private void startParticipants() {
+    for (int i = 0; i < _numParticipants; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(_clusterName, storageNodeName);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, _clusterName, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+  }
+
+  /**
+   * Stop participants and optionally drop the participants
+   * if dropParticipants is true
+   *
+   * @param dropParticipants true to drop the participants, false to stop the 
participants
+   */
+  private void stopParticipants(boolean dropParticipants) {
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+      if (dropParticipants) {
+        _gSetupTool.getClusterManagementTool().dropInstance(_clusterName,
+            _configAccessor.getInstanceConfig(_clusterName, 
participant.getInstanceName()));
+      }
+    }
+    _participants.clear();
+  }
+
+  /**
+   * Create a resource with the given number of partitions and replicas
+   * WARNING: 1) assume only support OnlineOffline state model
+   *          2) assume only support FULL_AUTO rebalance mode
+   *
+   * Default rebalance strategy is CrushEdRebalanceStrategy
+   *
+   * @param resourceName    name of the resource
+   * @param numPartitions   number of partitions
+   * @param numReplicas     number of replicas
+   */
+
+  protected void createResource(String resourceName, int numPartitions, int 
numReplicas) {
+    createResource(resourceName, numPartitions, numReplicas, 
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");}
+
+  /**
+   * Create a resource with the given number of partitions, replicas and 
rebalance strategy
+   *
+   * @param resourceName      name of the resource
+   * @param numPartitions     number of partitions
+   * @param numReplicas       number of replicas
+   * @param rebalanceStrategy rebalance strategy
+   */
+  protected void createResource(String resourceName, int numPartitions, int 
numReplicas, String rebalanceStrategy) {
+    _gSetupTool.getClusterManagementTool().addResource(_clusterName, 
resourceName, numPartitions, "OnlineOffline",
+        "FULL_AUTO", rebalanceStrategy);
+    _gSetupTool.getClusterManagementTool().rebalance(_clusterName, 
resourceName, numReplicas);
+  }
+
+  /**
+   * Start the gateway service with the given number of gateway instances
+   * defined by _numGatewayInstances
+   */
+  protected void startGatewayService() {
+    // Start the gateway service
+  }
+
+  /**
+   * Stop the gateway service
+   */
+  protected void stopGatewayService() {
+    // Stop the gateway service
+
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
new file mode 100644
index 000000000..8d87bc4f3
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
@@ -0,0 +1,814 @@
+package org.apache.helix.gateway.base;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.api.config.HelixConfigProperty;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.gateway.base.util.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeSuite;
+
+public abstract class ZookeeperTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperTestBase.class);
+  private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
+  private static final String NUM_ZK_PROPERTY_KEY = "numZk";
+
+  protected static ZkServer _zkServer;
+  protected static HelixZkClient _gZkClient;
+  protected static ClusterSetup _gSetupTool;
+  protected static BaseDataAccessor<ZNRecord> _baseAccessor;
+  protected static MBeanServerConnection _server = 
ManagementFactory.getPlatformMBeanServer();
+
+  private final Map<String, Map<String, HelixZkClient>> _liveInstanceOwners = 
new HashMap<>();
+
+  private static final String ZK_PREFIX = "localhost:";
+  private static final int ZK_START_PORT = 2283;
+  public static final String ZK_ADDR = ZK_PREFIX + ZK_START_PORT;
+  protected static final String CLUSTER_PREFIX = "CLUSTER";
+  protected static final String CONTROLLER_CLUSTER_PREFIX = 
"CONTROLLER_CLUSTER";
+  protected final String CONTROLLER_PREFIX = "controller";
+  protected final String PARTICIPANT_PREFIX = "localhost";
+  private static final long MANUAL_GC_PAUSE = 4000L;
+
+  /*
+   * Multiple ZK references
+   */
+  // The following maps hold ZK connect string as keys
+  protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+  protected static final Map<String, HelixZkClient> _helixZkClientMap = new 
HashMap<>();
+  protected static final Map<String, ClusterSetup> _clusterSetupMap = new 
HashMap<>();
+  protected static final Map<String, BaseDataAccessor> _baseDataAccessorMap = 
new HashMap<>();
+
+  static public void reportPhysicalMemory() {
+    com.sun.management.OperatingSystemMXBean os = 
(com.sun.management.OperatingSystemMXBean)
+        java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long physicalMemorySize = os.getTotalPhysicalMemorySize();
+    System.out.println("************ SYSTEM Physical Memory:"  + 
physicalMemorySize);
+
+    long MB = 1024 * 1024;
+    Runtime runtime = Runtime.getRuntime();
+    long free = runtime.freeMemory()/MB;
+    long total = runtime.totalMemory()/MB;
+    System.out.println("************ total memory:" + total + " free memory:" 
+ free);
+  }
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception {
+    // TODO: use logging.properties file to config java.util.logging.Logger 
levels
+    java.util.logging.Logger topJavaLogger = 
java.util.logging.Logger.getLogger("");
+    topJavaLogger.setLevel(Level.WARNING);
+
+    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk 
commends
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 
"3000");
+
+    // Start in-memory ZooKeepers
+    // If multi-ZooKeeper is enabled, start more ZKs. Otherwise, just set up 
one ZK
+    int numZkToStart = 1;
+    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
+    if (multiZkConfig != null && 
multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
+      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
+      if (numZkFromConfig != null) {
+        try {
+          numZkToStart = Math.max(Integer.parseInt(numZkFromConfig), 
numZkToStart);
+        } catch (Exception e) {
+          Assert.fail("Failed to parse the number of ZKs from config!");
+        }
+      } else {
+        Assert.fail("multiZk config is set but numZk config is missing!");
+      }
+    }
+
+    // Start "numZkFromConfigInt" ZooKeepers
+    for (int i = 0; i < numZkToStart; i++) {
+      startZooKeeper(i);
+    }
+
+    // Set the references for backward-compatibility with a single ZK 
environment
+    _zkServer = _zkServerMap.get(ZK_ADDR);
+    _gZkClient = _helixZkClientMap.get(ZK_ADDR);
+    _gSetupTool = _clusterSetupMap.get(ZK_ADDR);
+    _baseAccessor = _baseDataAccessorMap.get(ZK_ADDR);
+
+    // Clean up all JMX objects
+    for (ObjectName mbean : _server.queryNames(null, null)) {
+      try {
+        _server.unregisterMBean(mbean);
+      } catch (Exception e) {
+        // OK
+      }
+    }
+  }
+
+  /**
+   * Starts an additional in-memory ZooKeeper for testing.
+   * @param i index to be added to the ZK port to avoid conflicts
+   * @throws Exception
+   */
+  private static synchronized void startZooKeeper(int i) {
+    String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
+    _zkServerMap.computeIfAbsent(zkAddress, 
ZookeeperTestBase::createZookeeperServer);
+    _helixZkClientMap.computeIfAbsent(zkAddress, 
ZookeeperTestBase::createZkClient);
+    _clusterSetupMap.computeIfAbsent(zkAddress, key -> new 
ClusterSetup(_helixZkClientMap.get(key)));
+    _baseDataAccessorMap.computeIfAbsent(zkAddress, key -> new 
ZkBaseDataAccessor(_helixZkClientMap.get(key)));
+  }
+
+  private static ZkServer createZookeeperServer(String zkAddress) {
+    try {
+      return Preconditions.checkNotNull(TestHelper.startZkServer(zkAddress));
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to start zookeeper server at 
" + zkAddress, e);
+    }
+  }
+
+  private static HelixZkClient createZkClient(String zkAddress) {
+    HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    return DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), 
clientConfig);
+  }
+
+  @AfterSuite
+  public void afterSuite() throws IOException {
+    // Clean up all JMX objects
+    for (ObjectName mbean : _server.queryNames(null, null)) {
+      try {
+        _server.unregisterMBean(mbean);
+      } catch (Exception e) {
+        // OK
+      }
+    }
+
+    synchronized (ZookeeperTestBase.class) {
+      // Close all ZK resources
+      _baseDataAccessorMap.values().forEach(BaseDataAccessor::close);
+      _clusterSetupMap.values().forEach(ClusterSetup::close);
+      _helixZkClientMap.values().forEach(HelixZkClient::close);
+      _zkServerMap.values().forEach(TestHelper::stopZkServer);
+    }
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    cleanupJMXObjects();
+    // Giving each test some time to settle (such as gc pause, etc).
+    // Note that this is the best effort we could make to stabilize tests, not 
a complete solution
+    Runtime.getRuntime().gc();
+    Thread.sleep(MANUAL_GC_PAUSE);
+  }
+
+  @BeforeMethod
+  public void beforeTest(Method testMethod, ITestContext testContext) {
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+  }
+
+  protected void cleanupJMXObjects() throws IOException {
+    // Clean up all JMX objects
+    for (ObjectName mbean : _server.queryNames(null, null)) {
+      try {
+        _server.unregisterMBean(mbean);
+      } catch (Exception e) {
+        // OK
+      }
+    }
+  }
+
+  protected String getShortClassName() {
+    return this.getClass().getSimpleName();
+  }
+
+  protected String getCurrentLeader(HelixZkClient zkClient, String 
clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null) {
+      return null;
+    }
+    return leader.getInstanceName();
+  }
+
+  protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient, 
String clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistBestPossibleAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enablePersistIntermediateAssignment(HelixZkClient zkClient, 
String clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistIntermediateAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String 
clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setTopologyAwareEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String 
clusterName,
+      boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setDelayRebalaceEnabled(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String 
clusterName,
+      String instanceName, boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(clusterName, instanceName);
+    instanceConfig.setDelayRebalanceEnabled(enabled);
+    configAccessor.setInstanceConfig(clusterName, instanceName, 
instanceConfig);
+  }
+
+  protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String 
clusterName,
+      boolean enabled, long delay) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setDelayRebalaceEnabled(enabled);
+    clusterConfig.setRebalanceDelayTime(delay);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void enableP2PInCluster(String clusterName, ConfigAccessor 
configAccessor,
+      boolean enable) {
+    // enable p2p message in cluster.
+    if (enable) {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      clusterConfig.enableP2PMessage(true);
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    } else {
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(clusterName);
+      clusterConfig.getRecord().getSimpleFields()
+          .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+      configAccessor.setClusterConfig(clusterName, clusterConfig);
+    }
+  }
+
+  protected void enableP2PInResource(String clusterName, ConfigAccessor 
configAccessor,
+      String dbName, boolean enable) {
+    if (enable) {
+      ResourceConfig resourceConfig =
+          new 
ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+      configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+    } else {
+      // remove P2P Message in resource config
+      ResourceConfig resourceConfig = 
configAccessor.getResourceConfig(clusterName, dbName);
+      if (resourceConfig != null) {
+        resourceConfig.getRecord().getSimpleFields()
+            .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+        configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+      }
+    }
+  }
+
+  protected void setDelayTimeInCluster(HelixZkClient zkClient, String 
clusterName, long delay) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setRebalanceDelayTime(delay);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
+      String clusterName, long lastOnDemandTime) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, 
numPartition, replica,
+        minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+  }
+
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay,
+      String rebalanceStrategy) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, 
minActiveReplica,
+        delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy);
+  }
+
+  protected IdealState createResourceWithWagedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, 
minActiveReplica,
+        -1, WagedRebalancer.class.getName(), null);
+  }
+
+  private IdealState createResource(String clusterName, String db, String 
stateModel,
+      int numPartition, int replica, int minActiveReplica, long delay, String 
rebalancerClassName,
+      String rebalanceStrategy) {
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    if (idealState == null) {
+      _gSetupTool.addResourceToCluster(clusterName, db, numPartition, 
stateModel,
+          IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+    }
+
+    idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+    idealState.setMinActiveReplicas(minActiveReplica);
+    if (!idealState.isDelayRebalanceEnabled()) {
+      idealState.setDelayRebalanceEnabled(true);
+    }
+    if (delay > 0) {
+      idealState.setRebalanceDelay(delay);
+    }
+    idealState.setRebalancerClassName(rebalancerClassName);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, 
db, idealState);
+    _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
+    idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+    return idealState;
+  }
+
+  protected IdealState createIdealState(String resourceGroupName, String 
instanceGroupTag,
+      List<String> instanceNames, int numPartition, int replica, String 
rebalanceMode,
+      String stateModelDef) {
+    IdealState is = 
_gSetupTool.createIdealStateForResourceGroup(resourceGroupName,
+        instanceGroupTag, numPartition, replica, rebalanceMode, stateModelDef);
+
+    // setup initial partition->instance mapping.
+    int nodeIdx = 0;
+    int numNode = instanceNames.size();
+    assert (numNode >= replica);
+    for (int i = 0; i < numPartition; i++) {
+      String partitionName = resourceGroupName + "_" + i;
+      for (int j = 0; j < replica; j++) {
+        is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % 
numNode),
+            OnlineOfflineSMD.States.ONLINE.toString());
+      }
+      nodeIdx++;
+    }
+
+    return is;
+  }
+
+  protected void createDBInSemiAuto(ClusterSetup clusterSetup, String 
clusterName, String dbName,
+      List<String> preferenceList, String stateModelDef, int numPartition, int 
replica) {
+    clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, 
stateModelDef,
+        IdealState.RebalanceMode.SEMI_AUTO.toString());
+    clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica);
+
+    IdealState is =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, 
dbName);
+    for (String p : is.getPartitionSet()) {
+      is.setPreferenceList(p, preferenceList);
+    }
+    clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, 
dbName, is);
+  }
+
+  /**
+   * Validate there should be always minimal active replica and top state 
replica for each
+   * partition.
+   * Also make sure there is always some partitions with only active replica 
count.
+   */
+  protected void validateMinActiveAndTopStateReplica(IdealState is, 
ExternalView ev,
+      int minActiveReplica, int numNodes) {
+    StateModelDefinition stateModelDef =
+        
BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+    String topState = stateModelDef.getStatesPriorityList().get(0);
+    int replica = Integer.valueOf(is.getReplicas());
+
+    Map<String, Integer> stateCount = stateModelDef.getStateCountMap(numNodes, 
replica);
+    Set<String> activeStates = stateCount.keySet();
+
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Assert.assertNotNull(assignmentMap,
+          is.getResourceName() + "'s best possible assignment is null for 
partition " + partition);
+      Assert.assertTrue(!assignmentMap.isEmpty(),
+          is.getResourceName() + "'s partition " + partition + " has no best 
possible map in IS.");
+
+      boolean hasTopState = false;
+      int activeReplica = 0;
+      for (String state : assignmentMap.values()) {
+        if (topState.equalsIgnoreCase(state)) {
+          hasTopState = true;
+        }
+        if (activeStates.contains(state)) {
+          activeReplica++;
+        }
+      }
+
+      if (activeReplica < minActiveReplica) {
+        int a = 0;
+      }
+
+      Assert.assertTrue(hasTopState, String.format("%s missing %s replica", 
partition, topState));
+      Assert.assertTrue(activeReplica >= minActiveReplica,
+          String.format("%s has less active replica %d then required %d", 
partition, activeReplica,
+              minActiveReplica));
+    }
+  }
+
+  protected void runStage(HelixManager manager, ClusterEvent event, Stage 
stage) throws Exception {
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics 
are implemented in
+    // execute() function call
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  public void verifyInstance(HelixZkClient zkClient, String clusterName, 
String instance,
+      boolean wantExists) {
+    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+    String instanceConfigsPath = 
PropertyPathBuilder.instanceConfig(clusterName);
+    String instanceConfigPath = instanceConfigsPath + "/" + instance;
+    String instancePath = PropertyPathBuilder.instance(clusterName, instance);
+    Assert.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+    Assert.assertEquals(wantExists, zkClient.exists(instancePath));
+  }
+
+  public void verifyResource(HelixZkClient zkClient, String clusterName, 
String resource,
+      boolean wantExists) {
+    String resourcePath = PropertyPathBuilder.idealState(clusterName, 
resource);
+    Assert.assertEquals(wantExists, zkClient.exists(resourcePath));
+  }
+
+  public void verifyEnabled(HelixZkClient zkClient, String clusterName, String 
instance,
+      boolean wantEnabled) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instance));
+    Assert.assertEquals(wantEnabled, config.getInstanceEnabled());
+  }
+
+  public void verifyReplication(HelixZkClient zkClient, String clusterName, 
String resource,
+      int repl) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(resource));
+    for (String partitionName : idealState.getPartitionSet()) {
+      if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) 
{
+        Assert.assertEquals(repl, 
idealState.getPreferenceList(partitionName).size());
+      } else if (idealState.getRebalanceMode() == 
IdealState.RebalanceMode.CUSTOMIZED) {
+        Assert.assertEquals(repl, 
idealState.getInstanceStateMap(partitionName).size());
+      }
+    }
+  }
+
+  protected void setupStateModel(String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    StateModelDefinition masterSlave =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), 
masterSlave);
+
+    StateModelDefinition leaderStandby =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), 
leaderStandby);
+
+    StateModelDefinition onlineOffline =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), 
onlineOffline);
+
+  }
+
+  protected Message createMessage(Message.MessageType type, String msgId, 
String fromState,
+      String toState, String resourceName, String tgtName) {
+    Message msg = new Message(type.toString(), msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    
msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(), 
resourceName);
+    msg.setTgtName(tgtName);
+    return msg;
+  }
+
+  protected List<IdealState> setupIdealState(String clusterName, int[] nodes, 
String[] resources,
+      int partitions, int replicas) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    List<IdealState> idealStates = new ArrayList<>();
+    List<String> instances = new ArrayList<>();
+    for (int i : nodes) {
+      instances.add("localhost_" + i);
+    }
+
+    for (String resourceName : resources) {
+      IdealState idealState = new IdealState(resourceName);
+      for (int p = 0; p < partitions; p++) {
+        List<String> value = new ArrayList<>();
+        for (int r = 0; r < replicas; r++) {
+          int n = nodes[(p + r) % nodes.length];
+          value.add("localhost_" + n);
+        }
+        idealState.getRecord().setListField(resourceName + "_" + p, value);
+      }
+
+      idealState.setReplicas(Integer.toString(replicas));
+      idealState.setStateModelDefRef("MasterSlave");
+      idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+      idealState.setNumPartitions(partitions);
+      idealStates.add(idealState);
+
+      // System.out.println(idealState);
+      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+    }
+    return idealStates;
+  }
+
+  @AfterClass
+  public void cleanupLiveInstanceOwners() throws InterruptedException {
+    String testClassName = this.getShortClassName();
+    System.out.println("AfterClass: " + testClassName + " called.");
+    for (String cluster : _liveInstanceOwners.keySet()) {
+      Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
+      for (HelixZkClient client : clientMap.values()) {
+        client.close();
+      }
+      clientMap.clear();
+    }
+    _liveInstanceOwners.clear();
+  }
+
+  protected List<LiveInstance> setupLiveInstances(String clusterName, int[] 
liveInstances) {
+    HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+
+    List<LiveInstance> result = new ArrayList<>();
+
+    for (int i = 0; i < liveInstances.length; i++) {
+      String instance = "localhost_" + liveInstances[i];
+      _liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>());
+      Map<String, HelixZkClient> clientMap = 
_liveInstanceOwners.get(clusterName);
+      clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), 
clientConfig));
+      HelixZkClient client = clientMap.get(instance);
+
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(client));
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+      LiveInstance liveInstance = new LiveInstance(instance);
+      // Keep setting the session id in the deprecated field for ensure the 
same behavior as a real participant.
+      // Note the participant is doing so for backward compatibility.
+      liveInstance.setSessionId(Long.toHexString(client.getSessionId()));
+      // Please refer to the version requirement here: 
helix-core/src/main/resources/cluster-manager-version.properties
+      // Ensuring version compatibility can avoid the warning message during 
test.
+      liveInstance.setHelixVersion("0.4");
+      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+      result.add(accessor.getProperty(keyBuilder.liveInstance(instance)));
+    }
+    return result;
+  }
+
+  protected void deleteLiveInstances(String clusterName) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    Map<String, HelixZkClient> clientMap = 
_liveInstanceOwners.getOrDefault(clusterName, Collections.emptyMap());
+
+    for (String liveInstance : 
accessor.getChildNames(keyBuilder.liveInstances())) {
+      ZKHelixDataAccessor dataAccessor =
+          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(_gZkClient));
+      dataAccessor.removeProperty(keyBuilder.liveInstance(liveInstance));
+
+      HelixZkClient client = clientMap.remove(liveInstance);
+      if (client != null) {
+        client.close();
+      }
+    }
+
+    if (clientMap.isEmpty()) {
+      _liveInstanceOwners.remove(clusterName);
+    }
+  }
+
+  protected void setupInstances(String clusterName, int[] instances) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    for (int i = 0; i < instances.length; i++) {
+      String instance = "localhost_" + instances[i];
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort("" + instances[i]);
+      
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+      admin.addInstance(clusterName, instanceConfig);
+    }
+  }
+
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline, boolean 
shouldThrowException)
+      throws Exception {
+    try {
+      pipeline.handle(event);
+      pipeline.finish();
+    } catch (Exception e) {
+      if (shouldThrowException) {
+        throw e;
+      } else {
+        LOG.error("Exception while executing pipeline: {}. Will not continue 
to next pipeline",
+            pipeline, e);
+      }
+    }
+  }
+
+  protected void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics 
are implemented in
+    // execute() function call
+    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving 
runStage()
+    // to a shared library
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
+    stage.postProcess();
+  }
+
+  protected void deleteCluster(String clusterName) {
+    TestHelper.dropCluster(clusterName, _gZkClient, _gSetupTool);
+  }
+
+  /**
+   * Poll for the existence (or lack thereof) of a specific Helix property
+   * @param clazz the HelixProeprty subclass
+   * @param accessor connected HelixDataAccessor
+   * @param key the property key to look up
+   * @param shouldExist true if the property should exist, false otherwise
+   * @return the property if found, or null if it does not exist
+   */
+  protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, 
HelixDataAccessor accessor,
+      PropertyKey key, boolean shouldExist) throws InterruptedException {
+    final int POLL_TIMEOUT = 5000;
+    final int POLL_INTERVAL = 50;
+    T property = accessor.getProperty(key);
+    int timeWaited = 0;
+    while (((shouldExist && property == null) || (!shouldExist && property != 
null))
+        && timeWaited < POLL_TIMEOUT) {
+      Thread.sleep(POLL_INTERVAL);
+      timeWaited += POLL_INTERVAL;
+      property = accessor.getProperty(key);
+    }
+    return property;
+  }
+
+  /**
+   * Ensures that external view and current state are empty
+   */
+  protected static class EmptyZkVerifier implements 
ClusterStateVerifier.ZkVerifier {
+    private final String _clusterName;
+    private final String _resourceName;
+    private final HelixZkClient _zkClient;
+
+    /**
+     * Instantiate the verifier
+     * @param clusterName the cluster to verify
+     * @param resourceName the resource to verify
+     */
+    public EmptyZkVerifier(String clusterName, String resourceName) {
+      _clusterName = clusterName;
+      _resourceName = resourceName;
+
+      _zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
+    }
+
+    @Override
+    public boolean verify() {
+      BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, 
baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      ExternalView externalView = 
accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+      // verify external view empty
+      if (externalView != null) {
+        for (String partition : externalView.getPartitionSet()) {
+          Map<String, String> stateMap = externalView.getStateMap(partition);
+          if (stateMap != null && !stateMap.isEmpty()) {
+            LOG.error("External view not empty for " + partition);
+            return false;
+          }
+        }
+      }
+
+      // verify current state empty
+      List<String> liveParticipants = 
accessor.getChildNames(keyBuilder.liveInstances());
+      for (String participant : liveParticipants) {
+        List<String> sessionIds = 
accessor.getChildNames(keyBuilder.sessions(participant));
+        for (String sessionId : sessionIds) {
+          CurrentState currentState =
+              accessor.getProperty(keyBuilder.currentState(participant, 
sessionId, _resourceName));
+          Map<String, String> partitionStateMap = 
currentState.getPartitionStateMap();
+          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+            LOG.error("Current state not empty for " + participant);
+            return false;
+          }
+        }
+
+        List<String> taskSessionIds =
+            
accessor.getChildNames(keyBuilder.taskCurrentStateSessions(participant));
+        for (String sessionId : taskSessionIds) {
+          CurrentState taskCurrentState = accessor
+              .getProperty(keyBuilder.taskCurrentState(participant, sessionId, 
_resourceName));
+          Map<String, String> taskPartitionStateMap = 
taskCurrentState.getPartitionStateMap();
+          if (taskPartitionStateMap != null && 
!taskPartitionStateMap.isEmpty()) {
+            LOG.error("Task current state not empty for " + participant);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return (ZkClient) _zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return _clusterName;
+    }
+  }
+
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
new file mode 100644
index 000000000..4fecde88f
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
@@ -0,0 +1,49 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The standalone cluster controller class
+ */
+public class ClusterControllerManager extends ClusterManager {
+  private static Logger LOG = 
LoggerFactory.getLogger(ClusterControllerManager.class);
+
+  public ClusterControllerManager(String zkAddr, String clusterName) {
+    this(zkAddr, clusterName, "controller");
+  }
+
+  public ClusterControllerManager(String zkAddr, String clusterName, String 
controllerName) {
+    super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER);
+  }
+
+  public ClusterControllerManager(String clusterName, HelixManagerProperty 
helixManagerProperty) {
+    super(clusterName, "controller", InstanceType.CONTROLLER, null, null, 
helixManagerProperty);
+  }
+
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
new file mode 100644
index 000000000..3858513e5
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
@@ -0,0 +1,135 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterManager extends ZKHelixManager implements Runnable {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterManager.class);
+  private static final int DISCONNECT_WAIT_TIME_MS = 3000;
+
+  private static AtomicLong UID = new AtomicLong(10000);
+  private long _uid;
+
+  private final String _clusterName;
+  private final String _instanceName;
+  private final InstanceType _type;
+
+  protected CountDownLatch _startCountDown = new CountDownLatch(1);
+  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
+  protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  protected boolean _started = false;
+
+  protected Thread _watcher;
+
+  protected ClusterManager(String zkAddr, String clusterName, String 
instanceName,
+      InstanceType type) {
+    super(clusterName, instanceName, type, zkAddr);
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _type = type;
+    _uid = UID.getAndIncrement();
+  }
+  protected ClusterManager(String clusterName, String instanceName, 
InstanceType instanceType,
+      String zkAddress, HelixManagerStateListener stateListener,
+      HelixManagerProperty helixManagerProperty) {
+    super(clusterName, instanceName, instanceType, zkAddress, stateListener, 
helixManagerProperty);
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _type = instanceType;
+    _uid = UID.getAndIncrement();
+  }
+
+  public void syncStop() {
+    _stopCountDown.countDown();
+    try {
+      _waitStopFinishCountDown.await();
+      _started = false;
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for finish", e);
+    }
+  }
+
+  // This should not be called more than once because HelixManager.connect() 
should not be called more than once.
+  public void syncStart() {
+    if (_started) {
+      throw new RuntimeException(
+          "Helix Controller already started. Do not call syncStart() more than 
once.");
+    } else {
+      _started = true;
+    }
+
+    _watcher = new Thread(this);
+    _watcher.setName(String
+        .format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName, 
_instanceName, _type.name(), _uid));
+    LOG.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}", 
_clusterName, _instanceName, _type.name(), _uid, 
Thread.currentThread().getStackTrace());
+    _watcher.start();
+
+    try {
+      _startCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for start", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      connect();
+      _startCountDown.countDown();
+      _stopCountDown.await();
+    } catch (Exception e) {
+      LOG.error("exception running controller-manager", e);
+    } finally {
+      _startCountDown.countDown();
+      disconnect();
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+
+  @Override
+  public void finalize() {
+    _watcher.interrupt();
+    try {
+      _watcher.join(DISCONNECT_WAIT_TIME_MS);
+    } catch (InterruptedException e) {
+      LOG.error("ClusterManager watcher cleanup in the finalize method was 
interrupted.", e);
+    } finally {
+      if (isConnected()) {
+        LOG.warn(
+            "The HelixManager ({}-{}-{}) is still connected after {} ms wait. 
This is a potential resource leakage!",
+            _clusterName, _instanceName, _type.name(), 
DISCONNECT_WAIT_TIME_MS);
+      }
+    }
+  }
+}
+
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
new file mode 100644
index 000000000..34047c525
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.HelixPropertyFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.base.statemodel.MockOFModelFactory;
+import org.apache.helix.gateway.base.statemodel.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.participant.StateMachineEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockParticipantManager extends ClusterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MockParticipantManager.class);
+
+  protected int _transDelay = 10;
+
+  protected MockOFModelFactory _ofModelFactory;
+  protected HelixCloudProperty _helixCloudProperty;
+
+  public MockParticipantManager(String zkAddr, String clusterName, String 
instanceName) {
+    this(zkAddr, clusterName, instanceName, 10);
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String 
instanceName,
+      int transDelay) {
+    this(zkAddr, clusterName, instanceName, transDelay, null);
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String 
instanceName,
+      int transDelay, HelixCloudProperty helixCloudProperty) {
+    this(zkAddr, clusterName, instanceName, transDelay, helixCloudProperty,
+        HelixPropertyFactory.getInstance().getHelixManagerProperty(zkAddr, 
clusterName));
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String 
instanceName,
+      int transDelay, HelixCloudProperty helixCloudProperty,
+      HelixManagerProperty helixManagerProperty) {
+    super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr, null, 
helixManagerProperty);
+    _transDelay = transDelay;
+    _ofModelFactory = new MockOFModelFactory(null);
+    _helixCloudProperty = helixCloudProperty;
+  }
+
+  public MockParticipantManager(String clusterName, String instanceName,
+      HelixManagerProperty helixManagerProperty, int transDelay,
+      HelixCloudProperty helixCloudProperty) {
+    super(clusterName, instanceName, InstanceType.PARTICIPANT, null, null, 
helixManagerProperty);
+    _transDelay = transDelay;
+    _ofModelFactory = new MockOFModelFactory();
+    _helixCloudProperty = helixCloudProperty;
+  }
+
+  public void setTransition(MockTransition transition) {
+    _ofModelFactory.setTrasition(transition);
+  }
+
+  /**
+   * This method should be called before syncStart() called after syncStop()
+   */
+  public void reset() {
+    syncStop();
+    _startCountDown = new CountDownLatch(1);
+    _stopCountDown = new CountDownLatch(1);
+    _waitStopFinishCountDown = new CountDownLatch(1);
+  }
+
+  @Override
+  public void run() {
+    try {
+      StateMachineEngine stateMach = getStateMachineEngine();
+      
stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
+          _ofModelFactory);
+      connect();
+      _startCountDown.countDown();
+
+      _stopCountDown.await();
+    } catch (InterruptedException e) {
+      String msg =
+          "participant: " + getInstanceName() + ", " + 
Thread.currentThread().getName()
+              + " is interrupted";
+      LOG.info(msg);
+    } catch (Exception e) {
+      LOG.error("exception running participant-manager", e);
+    } finally {
+      _startCountDown.countDown();
+
+      disconnect();
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
new file mode 100644
index 000000000..cf656b29d
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
@@ -0,0 +1,54 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+// mock master slave state model factory
+public class MockOFModelFactory extends StateModelFactory<MockOFStateModel> {
+  private MockTransition _transition;
+
+  public MockOFModelFactory() {
+    this(null);
+  }
+
+  public MockOFModelFactory(MockTransition transition) {
+    _transition = transition;
+  }
+
+  public void setTrasition(MockTransition transition) {
+    _transition = transition;
+
+    // set existing transition
+    for (String resource : getResourceSet()) {
+      for (String partition : getPartitionSet(resource)) {
+        MockOFStateModel stateModel = getStateModel(resource, partition);
+        stateModel.setTransition(transition);
+      }
+    }
+  }
+
+  @Override
+  public MockOFStateModel createNewStateModel(String resourceName, String 
partitionKey) {
+    MockOFStateModel model = new MockOFStateModel(_transition);
+
+    return model;
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
new file mode 100644
index 000000000..0e0350e1f
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
@@ -0,0 +1,65 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// mock master-slave state model
+@StateModelInfo(initialState = "OFFLINE", states = {
+    "ONLINE", "ERROR"
+})
+public class MockOFStateModel extends StateModel {
+  private static Logger LOG = LoggerFactory.getLogger(MockOFStateModel.class);
+
+  protected MockTransition _transition;
+
+  public MockOFStateModel(MockTransition transition) {
+    _transition = transition;
+  }
+
+  public void setTransition(MockTransition transition) {
+    _transition = transition;
+  }
+
+  @Transition(to = "*", from = "*")
+  public void generalTransitionHandle(Message message, NotificationContext 
context)
+      throws InterruptedException {
+    LOG.info(String
+        .format("Resource %s partition %s becomes %s from %s", 
message.getResourceName(),
+            message.getPartitionName(), message.getToState(), 
message.getFromState()));
+    if (_transition != null) {
+      _transition.doTransition(message, context);
+    }
+  }
+
+  @Override
+  public void reset() {
+    LOG.info("Default MockMSStateModel.reset() invoked");
+    if (_transition != null) {
+      _transition.doReset();
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
new file mode 100644
index 000000000..d61ba841d
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
@@ -0,0 +1,41 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockTransition {
+  private static Logger LOG = LoggerFactory.getLogger(MockTransition.class);
+
+  // called by state model transition functions
+  public void doTransition(Message message, NotificationContext context)
+      throws InterruptedException {
+    LOG.info("default doTransition() invoked");
+  }
+
+  // called by state model reset function
+  public void doReset() {
+    LOG.info("default doReset() invoked");
+  }
+
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
new file mode 100644
index 000000000..681f39205
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -0,0 +1,817 @@
+package org.apache.helix.gateway.base.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import 
org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+public class TestHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+  public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
+  public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500;
+  /**
+   * Returns a unused random port.
+   */
+  public static int getRandomPort() throws IOException {
+    ServerSocket sock = new ServerSocket();
+    sock.bind(null);
+    int port = sock.getLocalPort();
+    sock.close();
+
+    return port;
+  }
+
+  static public ZkServer startZkServer(final String zkAddress) throws 
Exception {
+    List<String> empty = Collections.emptyList();
+    return TestHelper.startZkServer(zkAddress, empty, true);
+  }
+
+  static public ZkServer startZkServer(final String zkAddress, final String 
rootNamespace)
+      throws Exception {
+    List<String> rootNamespaces = new ArrayList<String>();
+    rootNamespaces.add(rootNamespace);
+    return TestHelper.startZkServer(zkAddress, rootNamespaces, true);
+  }
+
+  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces)
+      throws Exception {
+    return startZkServer(zkAddress, rootNamespaces, true);
+  }
+
+  static public ZkServer startZkServer(final String zkAddress, final 
List<String> rootNamespaces,
+      boolean overwrite) throws Exception {
+    System.out.println(
+        "Start zookeeper at " + zkAddress + " in thread " + 
Thread.currentThread().getName());
+
+    String zkDir = zkAddress.replace(':', '_');
+    final String logDir = "/tmp/" + zkDir + "/logs";
+    final String dataDir = "/tmp/" + zkDir + "/dataDir";
+    if (overwrite) {
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+    }
+    ZKClientPool.reset();
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient) {
+        if (rootNamespaces == null) {
+          return;
+        }
+
+        for (String rootNamespace : rootNamespaces) {
+          try {
+            zkClient.deleteRecursive(rootNamespace);
+          } catch (Exception e) {
+            LOG.error("fail to deleteRecursive path:" + rootNamespace, e);
+          }
+        }
+      }
+    };
+
+    int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') 
+ 1));
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+
+    return zkServer;
+  }
+
+  static public void stopZkServer(ZkServer zkServer) {
+    if (zkServer != null) {
+      zkServer.shutdown();
+      System.out.println(
+          "Shut down zookeeper at port " + zkServer.getPort() + " in thread " 
+ Thread
+              .currentThread().getName());
+    }
+  }
+
+  public static void setupEmptyCluster(HelixZkClient zkClient, String 
clusterName) {
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+    admin.addCluster(clusterName, true);
+  }
+
+  /**
+   * convert T[] to set<T>
+   * @param s
+   * @return
+   */
+  public static <T> Set<T> setOf(T... s) {
+    Set<T> set = new HashSet<T>(Arrays.asList(s));
+    return set;
+  }
+
+  /**
+   * generic method for verification with a timeout
+   * @param verifierName
+   * @param args
+   */
+  public static void verifyWithTimeout(String verifierName, long timeout, 
Object... args) {
+    final long sleepInterval = 1000; // in ms
+    final int loop = (int) (timeout / sleepInterval) + 1;
+    try {
+      boolean result = false;
+      int i = 0;
+      for (; i < loop; i++) {
+        Thread.sleep(sleepInterval);
+        // verifier should be static method
+        result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, 
args);
+
+        if (result == true) {
+          break;
+        }
+      }
+
+      // debug
+      // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify 
("
+      // + result + ")");
+      System.err.println(
+          verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" 
+ result + ")");
+      LOG.debug("args:" + Arrays.toString(args));
+      // System.err.println("args:" + Arrays.toString(args));
+
+      if (result == false) {
+        LOG.error(verifierName + " fails");
+        LOG.error("args:" + Arrays.toString(args));
+      }
+
+      Assert.assertTrue(result);
+    } catch (Exception e) {
+      LOG.error("Exception in verify: " + verifierName, e);
+    }
+  }
+
+  private static Method getMethod(String name) {
+    Method[] methods = TestHelper.class.getMethods();
+    for (Method method : methods) {
+      if (name.equals(method.getName())) {
+        return method;
+      }
+    }
+    return null;
+  }
+
+  public static boolean verifyEmptyCurStateAndExtView(String clusterName, 
String resourceName,
+      Set<String> instanceNames, String zkAddr) {
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    try {
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      for (String instanceName : instanceNames) {
+        List<String> sessionIds = 
accessor.getChildNames(keyBuilder.sessions(instanceName));
+
+        for (String sessionId : sessionIds) {
+          CurrentState curState =
+              accessor.getProperty(keyBuilder.currentState(instanceName, 
sessionId, resourceName));
+
+          if (curState != null && curState.getRecord().getMapFields().size() 
!= 0) {
+            return false;
+          }
+
+          CurrentState taskCurState =
+              accessor.getProperty(keyBuilder.taskCurrentState(instanceName, 
sessionId, resourceName));
+
+          if (taskCurState != null && 
taskCurState.getRecord().getMapFields().size() != 0) {
+            return false;
+          }
+        }
+
+        ExternalView extView = 
accessor.getProperty(keyBuilder.externalView(resourceName));
+
+        if (extView != null && extView.getRecord().getMapFields().size() != 0) 
{
+          return false;
+        }
+      }
+
+      return true;
+    } finally {
+      zkClient.close();
+    }
+  }
+
+  public static boolean verifyNotConnected(HelixManager manager) {
+    return !manager.isConnected();
+  }
+
+  public static void setupCluster(String clusterName, String zkAddr, int 
startPort,
+      String participantNamePrefix, String resourceNamePrefix, int resourceNb, 
int partitionNb,
+      int nodesNb, int replica, String stateModelDef, boolean doRebalance) 
throws Exception {
+    TestHelper
+        .setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, 
resourceNamePrefix,
+            resourceNb, partitionNb, nodesNb, replica, stateModelDef, 
RebalanceMode.SEMI_AUTO,
+            doRebalance);
+  }
+
+  public static void setupCluster(String clusterName, String zkAddr, int 
startPort,
+      String participantNamePrefix, String resourceNamePrefix, int resourceNb, 
int partitionNb,
+      int nodesNb, int replica, String stateModelDef, RebalanceMode mode, 
boolean doRebalance) {
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    try {
+      if (zkClient.exists("/" + clusterName)) {
+        LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+        zkClient.deleteRecursively("/" + clusterName);
+      }
+
+      ClusterSetup setupTool = new ClusterSetup(zkAddr);
+      setupTool.addCluster(clusterName, true);
+
+      for (int i = 0; i < nodesNb; i++) {
+        int port = startPort + i;
+        setupTool.addInstanceToCluster(clusterName, participantNamePrefix + 
"_" + port);
+      }
+
+      for (int i = 0; i < resourceNb; i++) {
+        String resourceName = resourceNamePrefix + i;
+        setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, 
stateModelDef,
+            mode.name(),
+            mode == RebalanceMode.FULL_AUTO ? 
CrushEdRebalanceStrategy.class.getName()
+                : RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY);
+        if (doRebalance) {
+          setupTool.rebalanceStorageCluster(clusterName, resourceName, 
replica);
+        }
+      }
+    } finally {
+      zkClient.close();
+    }
+  }
+
+  public static void dropCluster(String clusterName, RealmAwareZkClient 
zkClient) {
+    ClusterSetup setupTool = new ClusterSetup(zkClient);
+    dropCluster(clusterName, zkClient, setupTool);
+  }
+
+  public static void dropCluster(String clusterName, RealmAwareZkClient 
zkClient, ClusterSetup setup) {
+    String namespace = "/" + clusterName;
+    if (zkClient.exists(namespace)) {
+      try {
+        setup.deleteCluster(clusterName);
+      } catch (Exception ex) {
+        // Failed to delete, give some more time for connections to drop
+        try {
+          Thread.sleep(3000L);
+          setup.deleteCluster(clusterName);
+        } catch (Exception ignored) {
+          // OK - just ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * @param stateMap
+   *          : "ResourceName/partitionKey" -> setOf(instances)
+   * @param state
+   *          : MASTER|SLAVE|ERROR...
+   */
+  public static void verifyState(String clusterName, String zkAddr,
+      Map<String, Set<String>> stateMap, String state) {
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    try {
+      ZKHelixDataAccessor accessor =
+          new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+
+      for (String resGroupPartitionKey : stateMap.keySet()) {
+        Map<String, String> retMap = 
getResourceAndPartitionKey(resGroupPartitionKey);
+        String resGroup = retMap.get("RESOURCE");
+        String partitionKey = retMap.get("PARTITION");
+
+        ExternalView extView = 
accessor.getProperty(keyBuilder.externalView(resGroup));
+        for (String instance : stateMap.get(resGroupPartitionKey)) {
+          String actualState = extView.getStateMap(partitionKey).get(instance);
+          Assert.assertNotNull(actualState,
+              "externalView doesn't contain state for " + resGroup + "/" + 
partitionKey + " on "
+                  + instance + " (expect " + state + ")");
+
+          Assert.assertEquals(actualState, state,
+              "externalView for " + resGroup + "/" + partitionKey + " on " + 
instance + " is "
+                  + actualState + " (expect " + state + ")");
+        }
+      }
+    } finally {
+      zkClient.close();
+    }
+  }
+
+  /**
+   * @param resourcePartition
+   *          : key is in form of "resource/partitionKey" or "resource_x"
+   * @return
+   */
+  private static Map<String, String> getResourceAndPartitionKey(String 
resourcePartition) {
+    String resourceName;
+    String partitionName;
+    int idx = resourcePartition.indexOf('/');
+    if (idx > -1) {
+      resourceName = resourcePartition.substring(0, idx);
+      partitionName = resourcePartition.substring(idx + 1);
+    } else {
+      idx = resourcePartition.lastIndexOf('_');
+      resourceName = resourcePartition.substring(0, idx);
+      partitionName = resourcePartition;
+    }
+
+    Map<String, String> retMap = new HashMap<String, String>();
+    retMap.put("RESOURCE", resourceName);
+    retMap.put("PARTITION", partitionName);
+    return retMap;
+  }
+
+  public static <T> Map<String, T> startThreadsConcurrently(final int 
nrThreads,
+      final Callable<T> method, final long timeout) {
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+    final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+    final List<Thread> threadList = new ArrayList<Thread>();
+
+    for (int i = 0; i < nrThreads; i++) {
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+            if (isTimeout) {
+              LOG.error("Timeout while waiting for start latch");
+            }
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted while waiting for start latch");
+          }
+
+          try {
+            T result = method.call();
+            if (result != null) {
+              resultsMap.put("thread_" + this.getId(), result);
+            }
+            LOG.debug("result=" + result);
+          } catch (Exception e) {
+            LOG.error("Exeption in executing " + method.getClass().getName(), 
e);
+          }
+
+          finishCounter.countDown();
+        }
+      };
+      threadList.add(thread);
+      thread.start();
+    }
+    startLatch.countDown();
+
+    // wait for all thread to complete
+    try {
+      boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+      if (isTimeout) {
+        LOG.error("Timeout while waiting for finish latch. Interrupt all 
threads");
+        for (Thread thread : threadList) {
+          thread.interrupt();
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for finish latch", e);
+    }
+
+    return resultsMap;
+  }
+
+  public static Message createMessage(String msgId, String fromState, String 
toState,
+      String tgtName, String resourceName, String partitionName) {
+    Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
+    msg.setFromState(fromState);
+    msg.setToState(toState);
+    msg.setTgtName(tgtName);
+    msg.setResourceName(resourceName);
+    msg.setPartitionName(partitionName);
+    msg.setStateModelDef("MasterSlave");
+
+    return msg;
+  }
+
+  public static String getTestMethodName() {
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    return calls[2].getMethodName();
+  }
+
+  public static String getTestClassName() {
+    StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+    String fullClassName = calls[2].getClassName();
+    return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+  }
+
+  public static <T> Map<String, T> startThreadsConcurrently(final 
List<Callable<T>> methods,
+      final long timeout) {
+    final int nrThreads = methods.size();
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+    final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+    final List<Thread> threadList = new ArrayList<Thread>();
+
+    for (int i = 0; i < nrThreads; i++) {
+      final Callable<T> method = methods.get(i);
+
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+            if (isTimeout) {
+              LOG.error("Timeout while waiting for start latch");
+            }
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted while waiting for start latch");
+          }
+
+          try {
+            T result = method.call();
+            if (result != null) {
+              resultsMap.put("thread_" + this.getId(), result);
+            }
+            LOG.debug("result=" + result);
+          } catch (Exception e) {
+            LOG.error("Exeption in executing " + method.getClass().getName(), 
e);
+          }
+
+          finishCounter.countDown();
+        }
+      };
+      threadList.add(thread);
+      thread.start();
+    }
+    startLatch.countDown();
+
+    // wait for all thread to complete
+    try {
+      boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+      if (isTimeout) {
+        LOG.error("Timeout while waiting for finish latch. Interrupt all 
threads");
+        for (Thread thread : threadList) {
+          thread.interrupt();
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for finish latch", e);
+    }
+
+    return resultsMap;
+  }
+
+  public static void printCache(Map<String, ZNode> cache) {
+    System.out.println("START:Print cache");
+    TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
+    map.putAll(cache);
+
+    for (String key : map.keySet()) {
+      ZNode node = map.get(key);
+      TreeSet<String> childSet = new TreeSet<String>();
+      childSet.addAll(node.getChildSet());
+      System.out.print(
+          key + "=" + node.getData() + ", " + childSet + ", " + 
(node.getStat() == null ? "null\n"
+              : node.getStat()));
+    }
+    System.out.println("END:Print cache");
+  }
+
+  public static void readZkRecursive(String path, Map<String, ZNode> map, 
HelixZkClient zkclient) {
+    try {
+      Stat stat = new Stat();
+      ZNRecord record = zkclient.readData(path, stat);
+      List<String> childNames = zkclient.getChildren(path);
+      ZNode node = new ZNode(path, record, stat);
+      node.addChildren(childNames);
+      map.put(path, node);
+
+      for (String childName : childNames) {
+        String childPath = path + "/" + childName;
+        readZkRecursive(childPath, map, zkclient);
+      }
+    } catch (ZkNoNodeException e) {
+      // OK
+    }
+  }
+
+  public static void readZkRecursive(String path, Map<String, ZNode> map,
+      BaseDataAccessor<ZNRecord> zkAccessor) {
+    try {
+      Stat stat = new Stat();
+      ZNRecord record = zkAccessor.get(path, stat, 0);
+      List<String> childNames = zkAccessor.getChildNames(path, 0);
+      // System.out.println("childNames: " + childNames);
+      ZNode node = new ZNode(path, record, stat);
+      node.addChildren(childNames);
+      map.put(path, node);
+
+      if (childNames != null && !childNames.isEmpty()) {
+        for (String childName : childNames) {
+          String childPath = path + "/" + childName;
+          readZkRecursive(childPath, map, zkAccessor);
+        }
+      }
+    } catch (ZkNoNodeException e) {
+      // OK
+    }
+  }
+
+  public static boolean verifyZkCache(List<String> paths, 
BaseDataAccessor<ZNRecord> zkAccessor,
+      HelixZkClient zkclient, boolean needVerifyStat) {
+    // read everything
+    Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+    Map<String, ZNode> cache = new HashMap<String, ZNode>();
+    for (String path : paths) {
+      readZkRecursive(path, zkMap, zkclient);
+      readZkRecursive(path, cache, zkAccessor);
+    }
+    // printCache(map);
+
+    return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> 
cache,
+      HelixZkClient zkclient, boolean needVerifyStat) {
+    return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat,
+      Map<String, ZNode> cache, HelixZkClient zkclient, boolean 
needVerifyStat) {
+    // read everything on zk under paths
+    Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+    for (String path : paths) {
+      readZkRecursive(path, zkMap, zkclient);
+    }
+    // printCache(map);
+
+    return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, 
needVerifyStat);
+  }
+
+  public static boolean verifyZkCache(List<String> paths, List<String> 
pathsExcludeForStat,
+      Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean 
needVerifyStat) {
+    // equal size
+    if (zkMap.size() != cache.size()) {
+      System.err
+          .println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: 
" + zkMap.size());
+      System.out.println("cache: (" + cache.size() + ")");
+      TestHelper.printCache(cache);
+
+      System.out.println("zkMap: (" + zkMap.size() + ")");
+      TestHelper.printCache(zkMap);
+
+      return false;
+    }
+
+    // everything in cache is also in map
+    for (String path : cache.keySet()) {
+      ZNode cacheNode = cache.get(path);
+      ZNode zkNode = zkMap.get(path);
+
+      if (zkNode == null) {
+        // in cache but not on zk
+        System.err.println("path: " + path + " in cache but not on zk: 
inCacheNode: " + cacheNode);
+        return false;
+      }
+
+      if ((zkNode.getData() == null && cacheNode.getData() != null) || 
(zkNode.getData() != null
+          && cacheNode.getData() == null) || (zkNode.getData() != null
+          && cacheNode.getData() != null && 
!zkNode.getData().equals(cacheNode.getData()))) {
+        // data not equal
+        System.err.println(
+            "data mismatch on path: " + path + ", inCache: " + 
cacheNode.getData() + ", onZk: "
+                + zkNode.getData());
+        return false;
+      }
+
+      if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || 
(
+          zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || (
+          zkNode.getChildSet() != null && cacheNode.getChildSet() != null && 
!zkNode.getChildSet()
+              .equals(cacheNode.getChildSet()))) {
+        // childSet not equal
+        System.err.println(
+            "childSet mismatch on path: " + path + ", inCache: " + 
cacheNode.getChildSet()
+                + ", onZk: " + zkNode.getChildSet());
+        return false;
+      }
+
+      if (needVerifyStat && pathsExcludeForStat != null && 
!pathsExcludeForStat.contains(path)) {
+        if (cacheNode.getStat() == null || 
!zkNode.getStat().equals(cacheNode.getStat())) {
+          // stat not equal
+          System.err.println(
+              "Stat mismatch on path: " + path + ", inCache: " + 
cacheNode.getStat() + ", onZk: "
+                  + zkNode.getStat());
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  public static StateModelDefinition generateStateModelDefForBootstrap() {
+    ZNRecord record = new ZNRecord("Bootstrap");
+    
record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), 
"IDLE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("ONLINE");
+    statePriorityList.add("BOOTSTRAP");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("IDLE");
+    statePriorityList.add("DROPPED");
+    statePriorityList.add("ERROR");
+    
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+        statePriorityList);
+    for (String state : statePriorityList) {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("ONLINE")) {
+        metadata.put("count", 
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
+        record.setMapField(key, metadata);
+      } else if (state.equals("BOOTSTRAP")) {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      } else if (state.equals("OFFLINE")) {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      } else if (state.equals("IDLE")) {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      } else if (state.equals("DROPPED")) {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      } else if (state.equals("ERROR")) {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+
+    for (String state : statePriorityList) {
+      String key = state + ".next";
+      if (state.equals("ONLINE")) {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("BOOTSTRAP", "OFFLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        metadata.put("IDLE", "OFFLINE");
+        record.setMapField(key, metadata);
+      } else if (state.equals("BOOTSTRAP")) {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "ONLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "OFFLINE");
+        metadata.put("IDLE", "OFFLINE");
+        record.setMapField(key, metadata);
+      } else if (state.equals("OFFLINE")) {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "BOOTSTRAP");
+        metadata.put("BOOTSTRAP", "BOOTSTRAP");
+        metadata.put("DROPPED", "IDLE");
+        metadata.put("IDLE", "IDLE");
+        record.setMapField(key, metadata);
+      } else if (state.equals("IDLE")) {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("ONLINE", "OFFLINE");
+        metadata.put("BOOTSTRAP", "OFFLINE");
+        metadata.put("OFFLINE", "OFFLINE");
+        metadata.put("DROPPED", "DROPPED");
+        record.setMapField(key, metadata);
+      } else if (state.equals("ERROR")) {
+        Map<String, String> metadata = new HashMap<String, String>();
+        metadata.put("IDLE", "IDLE");
+        record.setMapField(key, metadata);
+      }
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("ONLINE-OFFLINE");
+    stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
+    stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
+    stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
+    stateTransitionPriorityList.add("OFFLINE-IDLE");
+    stateTransitionPriorityList.add("IDLE-OFFLINE");
+    stateTransitionPriorityList.add("IDLE-DROPPED");
+    stateTransitionPriorityList.add("ERROR-IDLE");
+    
record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+        stateTransitionPriorityList);
+    return new StateModelDefinition(record);
+  }
+
+  public static String znrecordToString(ZNRecord record) {
+    StringBuffer sb = new StringBuffer();
+    sb.append(record.getId() + "\n");
+    Map<String, String> simpleFields = record.getSimpleFields();
+    if (simpleFields != null) {
+      sb.append("simpleFields\n");
+      for (String key : simpleFields.keySet()) {
+        sb.append("  " + key + "\t: " + simpleFields.get(key) + "\n");
+      }
+    }
+
+    Map<String, List<String>> listFields = record.getListFields();
+    sb.append("listFields\n");
+    for (String key : listFields.keySet()) {
+      List<String> list = listFields.get(key);
+      sb.append("  " + key + "\t: ");
+      for (String listValue : list) {
+        sb.append(listValue + ", ");
+      }
+      sb.append("\n");
+    }
+
+    Map<String, Map<String, String>> mapFields = record.getMapFields();
+    sb.append("mapFields\n");
+    for (String key : mapFields.keySet()) {
+      Map<String, String> map = mapFields.get(key);
+      sb.append("  " + key + "\t: \n");
+      for (String mapKey : map.keySet()) {
+        sb.append("    " + mapKey + "\t: " + map.get(mapKey) + "\n");
+      }
+    }
+
+    return sb.toString();
+  }
+
+  public interface Verifier {
+    boolean verify() throws Exception;
+  }
+
+  public static boolean verify(Verifier verifier, long timeout) throws 
Exception {
+    long start = System.currentTimeMillis();
+    do {
+      boolean result = verifier.verify();
+      boolean isTimedout = (System.currentTimeMillis() - start) > timeout;
+      if (result || isTimedout) {
+        if (isTimedout && !result) {
+          LOG.error("verifier time out, consider try longer timeout, stack 
trace{}",
+              Arrays.asList(Thread.currentThread().getStackTrace()));
+        }
+        return result;
+      }
+      Thread.sleep(50);
+    } while (true);
+  }
+}


Reply via email to