Refactor and fix more integration tests.

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/de38a7db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/de38a7db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/de38a7db

Branch: refs/heads/master
Commit: de38a7dbde05c22b12829389c674ff22bdffc289
Parents: 874f9e6
Author: Lei Xia <l...@linkedin.com>
Authored: Thu Sep 28 10:25:26 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:08:54 2017 -0800

----------------------------------------------------------------------
 .../ZkHelixClusterVerifier.java                 |  37 +-
 .../TestCrushAutoRebalance.java                 | 316 +++++++++++++++++
 .../TestCrushAutoRebalanceNonRack.java          | 278 +++++++++++++++
 ...rushAutoRebalanceTopoplogyAwareDisabled.java |  93 +++++
 .../TestDelayedAutoRebalance.java               | 342 +++++++++++++++++++
 ...elayedAutoRebalanceWithDisabledInstance.java | 317 +++++++++++++++++
 .../TestDelayedAutoRebalanceWithRackaware.java  | 124 +++++++
 .../rebalancer/TestCrushAutoRebalance.java      | 316 -----------------
 .../TestCrushAutoRebalanceNonRack.java          | 278 ---------------
 ...rushAutoRebalanceTopoplogyAwareDisabled.java |  64 ----
 .../rebalancer/TestDelayedAutoRebalance.java    | 342 -------------------
 ...elayedAutoRebalanceWithDisabledInstance.java | 317 -----------------
 .../TestDelayedAutoRebalanceWithRackaware.java  |  76 -----
 13 files changed, 1497 insertions(+), 1403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index ad5cda2..472157f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -19,6 +19,9 @@ package org.apache.helix.tools.ClusterVerifiers;
  * under the License.
  */
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.HelixDataAccessor;
@@ -47,6 +50,13 @@ public abstract class ZkHelixClusterVerifier
   protected final PropertyKey.Builder _keyBuilder;
   private CountDownLatch _countdown;
 
+  private ExecutorService _verifyTaskThreadPool =
+      Executors.newSingleThreadExecutor(new ThreadFactory() {
+        @Override public Thread newThread(Runnable r) {
+          return new Thread(r, "ZkHelixClusterVerifier-verify_thread");
+        }
+      });
+
   protected static class ClusterVerifyTrigger {
     final PropertyKey _triggerKey;
     final boolean _triggerOnDataChange;
@@ -190,7 +200,6 @@ public abstract class ZkHelixClusterVerifier
     try {
       success = verifyState();
       if (!success) {
-
         success = _countdown.await(timeout, TimeUnit.MILLISECONDS);
         if (!success) {
           // make a final try if timeout
@@ -203,6 +212,7 @@ public abstract class ZkHelixClusterVerifier
 
     // clean up
     _zkClient.unsubscribeAll();
+    _verifyTaskThreadPool.shutdownNow();
 
     return success;
   }
@@ -233,17 +243,28 @@ public abstract class ZkHelixClusterVerifier
    */
   protected abstract boolean verifyState() throws Exception;
 
+  class VerifyStateCallbackTask implements Runnable {
+    @Override public void run() {
+      try {
+        boolean success = verifyState();
+        if (success) {
+          _countdown.countDown();
+        }
+      } catch (Exception ex) {
+        LOG.info("verifyState() throws exception: " + ex);
+      }
+    }
+  }
+
   @Override
   public void handleDataChange(String dataPath, Object data) throws Exception {
-    boolean success = verifyState();
-    if (success) {
-      _countdown.countDown();
-    }
+    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
   }
 
   @Override
   public void handleDataDeleted(String dataPath) throws Exception {
     _zkClient.unsubscribeDataChanges(dataPath, this);
+    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
   }
 
   @Override
@@ -252,11 +273,7 @@ public abstract class ZkHelixClusterVerifier
       String childPath = String.format("%s/%s", parentPath, child);
       _zkClient.subscribeDataChanges(childPath, this);
     }
-
-    boolean success = verifyState();
-    if (success) {
-      _countdown.countDown();
-    }
+    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
   }
 
   public ZkClient getZkClient() {

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
new file mode 100644
index 0000000..967175f
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
@@ -0,0 +1,316 @@
+package org.apache.helix.integration.rebalancer.CrushRebalancers;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import 
org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterSetup;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToZoneMap = new HashMap<String, String>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  Set<String> _allDBs = new HashSet<String>();
+  int _replica = 3;
+
+  String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, 
storageNodeName, zone);
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
+      _nodeToZoneMap.put(storageNodeName, zone);
+      _nodeToTagMap.put(storageNodeName, tag);
+      _nodes.add(storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String [][] rebalanceStrategies() {
+    return new String[][] { {"CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName()},
+        {"MultiRoundCrushRebalanceStrategy", 
MultiRoundCrushRebalanceStrategy.class.getName()}
+    };
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void testZoneIsolation(String rebalanceStrategyName, String 
rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("testZoneIsolation " + rebalanceStrategyName);
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev, _replica);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  public void testZoneIsolationWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws 
Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      
_setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev, _replica);
+    }
+  }
+
+  @Test (dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag"})
+  public void testLackEnoughLiveRacks() throws Exception {
+    System.out.println("TestLackEnoughInstances");
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){
+        p.syncStop();
+      }
+    }
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-CrushRebalanceStrategy-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev, 2);
+    }
+  }
+
+  @Test (dependsOnMethods = { "testLackEnoughLiveRacks"})
+  public void testLackEnoughRacks() throws Exception {
+    System.out.println("TestLackEnoughInstances ");
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){
+        p.syncStop();
+        _setupTool.getClusterManagementTool()
+            .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+        Thread.sleep(50);
+        _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+      }
+    }
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-CrushRebalanceStrategy-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev, 2);
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(100);
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with 
necessary tagged instances.
+   */
+  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int 
expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Set<String> assignedZones = new HashSet<String>();
+
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      // TODO: preference List is not persisted in IS.
+      //Assert.assertEquals(instancesInEV, instancesInIs);
+      for (String instance : instancesInEV) {
+        assignedZones.add(_nodeToZoneMap.get(instance));
+        if (tag != null) {
+          InstanceConfig config =
+              
_setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+      Assert.assertEquals(assignedZones.size(), expectedReplica);
+    }
+  }
+
+  @Test()
+  public void testAddZone() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testAddNodes() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testNodeFailure() throws Exception {
+    //TODO
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
new file mode 100644
index 0000000..c11ee87
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
@@ -0,0 +1,278 @@
+package org.apache.helix.integration.rebalancer.CrushRebalancers;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterSetup;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  Set<String> _allDBs = new HashSet<String>();
+  int _replica = 3;
+
+  private static String[] _testModels = { 
BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology("/instance");
+    clusterConfig.setFaultZoneType("instance");
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _nodes.add(storageNodeName);
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
+      _nodeToTagMap.put(storageNodeName, tag);
+      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+      instanceConfig.setDomain("instance=" + storageNodeName);
+      configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, 
instanceConfig);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    //enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @DataProvider(name = "rebalanceStrategies") public static String[][] 
rebalanceStrategies() {
+    return new String[][] { { "CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName() } };
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled = true)
+  public void test(String rebalanceStrategyName, String rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("Test " + rebalanceStrategyName);
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev, _replica);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= "test")
+  public void testWithInstanceTag(String rebalanceStrategyName, String 
rebalanceStrategyClass)
+      throws Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 3;
+    for (String tag : tags) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      
_setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+      HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev, _replica);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= { "test",
+      "testWithInstanceTag"})
+  public void testLackEnoughLiveInstances(String rebalanceStrategyName,
+      String rebalanceStrategyClass) throws Exception {
+    System.out.println("TestLackEnoughInstances " + rebalanceStrategyName);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      _participants.get(i).syncStop();
+    }
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev, 2);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= { "test",
+      "testWithInstanceTag"})
+  public void testLackEnoughInstances(String rebalanceStrategyName,
+      String rebalanceStrategyClass) throws Exception {
+    System.out.println("TestLackEnoughInstances " + rebalanceStrategyName);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      p.syncStop();
+      _setupTool.getClusterManagementTool()
+          .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+      Thread.sleep(50);
+      _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+    }
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _allDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateIsolation(is, ev, 2);
+    }
+  }
+
+  /**
+   * Validate each partition is different instances and with necessary tagged 
instances.
+   */
+  private void validateIsolation(IdealState is, ExternalView ev, int 
expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV.size(), expectedReplica);
+      for (String instance : instancesInEV) {
+        if (tag != null) {
+          InstanceConfig config =
+              
_setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+    }
+  }
+
+  @AfterMethod public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(200);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java
new file mode 100644
index 0000000..55245e7
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceTopoplogyAwareDisabled.java
@@ -0,0 +1,93 @@
+package org.apache.helix.integration.rebalancer.CrushRebalancers;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestCrushAutoRebalanceTopoplogyAwareDisabled extends 
TestCrushAutoRebalanceNonRack {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (ZkIntegrationTestBase._gZkClient.exists(namespace)) {
+      ZkIntegrationTestBase._gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZkIntegrationTestBase._gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + 
(TestCrushAutoRebalanceNonRack.START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      _nodes.add(storageNodeName);
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
+      _nodeToTagMap.put(storageNodeName, tag);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZkIntegrationTestBase.ZK_ADDR, 
CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZkIntegrationTestBase.ZK_ADDR, 
CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(ZkIntegrationTestBase._gZkClient, 
CLUSTER_NAME, true);
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void test(String rebalanceStrategyName,
+      String rebalanceStrategyClass) throws Exception {
+    super.test(rebalanceStrategyName, rebalanceStrategyClass);
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = "test")
+  public void testWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws 
Exception {
+    super.testWithInstanceTag(rebalanceStrategyName, rebalanceStrategyClass);
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = { "test", 
"testWithInstanceTag"
+  })
+  public void testLackEnoughLiveInstances(String rebalanceStrategyName,
+      String rebalanceStrategyClass) throws Exception {
+    super.testLackEnoughLiveInstances(rebalanceStrategyName, 
rebalanceStrategyClass);
+  }
+
+  @Test(dataProvider = "rebalanceStrategies", dependsOnMethods = { "test", 
"testWithInstanceTag"
+  })
+  public void testLackEnoughInstances(String rebalanceStrategyName,
+      String rebalanceStrategyClass) throws Exception {
+    super.testLackEnoughInstances(rebalanceStrategyName, 
rebalanceStrategyClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
new file mode 100644
index 0000000..817c207
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -0,0 +1,342 @@
+package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.omg.PortableServer.THREAD_POLICY_ID;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
+  final int NUM_NODE = 5;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 5;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
+  int _replica = 3;
+  int _minActiveReplica = _replica - 1;
+  HelixClusterVerifier _clusterVerifier;
+  List<String> _testDBs = new ArrayList<String>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected String[] TestStateModels = {
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  /**
+   * The partition movement should be delayed (not happen immediately) after 
one single node goes offline.
+   * Delay is enabled by default, delay time is set in IdealState.
+   * @throws Exception
+   */
+  @Test
+  public void testDelayedPartitionMovement() throws Exception {
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    validateDelayedMovements(externalViewsBefore);
+  }
+
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws 
Exception {
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+    validateDelayedMovements(externalViewsBefore);
+
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  /**
+   * Test when two nodes go offline,  the minimal active replica should be 
maintained.
+   * @throws Exception
+   */
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+    validateDelayedMovements(externalViewsBefore);
+
+    // bring down another node, the minimal active replica for each partition 
should be maintained.
+    _participants.get(3).syncStop();
+    Thread.sleep(500);
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+    }
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  /**
+   * The partititon should be moved to other nodes after the delay time
+   */
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    long delay = 4000;
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+    validateDelayedMovements(externalViewsBefore);
+
+    Thread.sleep(delay + 200);
+    Assert.assertTrue(_clusterVerifier.verify());
+    // after delay time, it should maintain required number of replicas.
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    }
+  }
+
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testDisableDelayRebalanceInResource() throws Exception {
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    validateDelayedMovements(externalViewsBefore);
+
+    // disable delay rebalance for one db, partition should be moved 
immediately
+    String testDb = _testDBs.get(0);
+    IdealState idealState = 
_setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, testDb);
+    idealState.setDelayRebalanceEnabled(false);
+    _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
testDb, idealState);
+    Thread.sleep(1000);
+
+
+    // once delay rebalance is disabled, it should maintain required number of 
replicas for that db.
+    // replica for other dbs should not be moved.
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is =
+          
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+
+      if (db.equals(testDb)) {
+        validateMinActiveAndTopStateReplica(idealState, ev, _replica, 
NUM_NODE);
+      } else {
+        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, 
NUM_NODE);
+        validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+            _participants.get(0).getInstanceName(), false);
+      }
+    }
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    validateDelayedMovements(externalViewsBefore);
+
+    // disable delay rebalance for the entire cluster.
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
+    // TODO: remove this once controller is listening on cluster config change.
+    RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), 
_testDBs.get(0));
+    Thread.sleep(500);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    }
+
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+    validateDelayedMovements(externalViewsBefore);
+
+    String disabledInstanceName = _participants.get(0).getInstanceName();
+    enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, 
disabledInstanceName, false);
+    Thread.sleep(1000);
+
+    for (String db : _testDBs) {
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      Map<String, List<String>> preferenceLists = is.getPreferenceLists();
+      for (List<String> instances : preferenceLists.values()) {
+        Assert.assertFalse(instances.contains(disabledInstanceName));
+      }
+    }
+    enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, 
disabledInstanceName, true);
+  }
+
+  @AfterMethod
+  public void afterTest() throws InterruptedException {
+    // delete all DBs create in last test
+    for (String db : _testDBs) {
+      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _testDBs.clear();
+    Thread.sleep(50);
+  }
+
+  @BeforeMethod
+  public void beforeTest() {
+    // restart any participant that has been disconnected from last test.
+    for (int i = 0; i < _participants.size(); i++) {
+      if (!_participants.get(i).isConnected()) {
+        _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+            _participants.get(i).getInstanceName()));
+        _participants.get(i).syncStart();
+      }
+    }
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, ExternalView> createTestDBs(long delayTime) throws 
InterruptedException {
+    Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, 
_PARTITIONS, _replica,
+          _minActiveReplica, delayTime);
+      _testDBs.add(db);
+    }
+    Thread.sleep(800);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, 
ExternalView evAfter,
+      String offlineInstance, boolean disabled) {
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentsBefore = 
evBefore.getRecord().getMapField(partition);
+      Map<String, String> assignmentsAfter = 
evAfter.getRecord().getMapField(partition);
+
+      Set<String> instancesBefore = new 
HashSet<String>(assignmentsBefore.keySet());
+      Set<String> instancesAfter = new 
HashSet<String>(assignmentsAfter.keySet());
+
+      if (disabled) {
+        // the offline node is disabled
+        Assert.assertEquals(instancesBefore, instancesAfter, String
+            .format("%s has been moved to new instances, before: %s, after: 
%s, disabled instance:",
+                partition, assignmentsBefore.toString(), 
assignmentsAfter.toString(),
+                offlineInstance));
+
+        if (instancesAfter.contains(offlineInstance)) {
+          Assert.assertEquals(assignmentsAfter.get(offlineInstance), 
"OFFLINE");
+        }
+      } else {
+        // the offline node actually went offline.
+        instancesBefore.remove(offlineInstance);
+        Assert.assertEquals(instancesBefore, instancesAfter, String
+            .format("%s has been moved to new instances, before: %s, after: 
%s, offline instance:",
+                partition, assignmentsBefore.toString(), 
assignmentsAfter.toString(),
+                offlineInstance));
+      }
+    }
+  }
+
+  private void validateDelayedMovements(Map<String, ExternalView> 
externalViewsBefore)
+      throws InterruptedException {
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(500);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), false);
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..330f962
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -0,0 +1,317 @@
+package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestDelayedAutoRebalanceWithDisabledInstance extends 
TestDelayedAutoRebalance {
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+
+  /**
+   * The partition movement should be delayed (not happen immediately) after 
one single node is disabled.
+   * Delay is enabled by default, delay time is set in IdealState.
+   * @throws Exception
+   */
+  @Test
+  @Override
+  public void testDelayedPartitionMovement() throws Exception {
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+
+    // Disable one node, no partition should be moved.
+    String instance = _participants.get(0).getInstanceName();
+    enableInstance(instance, false);
+
+    Thread.sleep(300);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev, instance, 
true);
+    }
+  }
+
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  @Override
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws 
Exception {
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+
+    // Disable one node, no partition should be moved.
+    String instance = _participants.get(0).getInstanceName();
+    enableInstance(instance, false);
+
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  /**
+   * Test when two nodes were disabled,  the minimal active replica should be 
maintained.
+   * @throws Exception
+   */
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  @Override
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+
+    // disable one node, no partition should be moved.
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    // disable another node, the minimal active replica for each partition 
should be maintained.
+    enableInstance(_participants.get(3).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+    }
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  /**
+   * Test when one node is disable while another node is offline, the minimal 
active replica should be maintained.
+   * @throws Exception
+   */
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testMinimalActiveReplicaMaintainWithOneOffline() throws 
Exception {
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+
+    // disable one node, no partition should be moved.
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    // bring down another node, the minimal active replica for each partition 
should be maintained.
+    _participants.get(3).syncStop();
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+    }
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  /**
+   * The partititon should be moved to other nodes after the delay time
+   */
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  @Override
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    long delay = 10000;
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+
+    // disable one node, no partition should be moved.
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    Thread.sleep(delay + 200);
+    // after delay time, it should maintain required number of replicas.
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    }
+  }
+
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  @Override
+  public void testDisableDelayRebalanceInResource() throws Exception {
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+
+    // disable one node, no partition should be moved.
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    // disable delay rebalance for one db, partition should be moved 
immediately
+    String testDb = _testDBs.get(0);
+    IdealState idealState = 
_setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, testDb);
+    idealState.setDelayRebalanceEnabled(false);
+    _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
testDb, idealState);
+    Thread.sleep(2000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    // once delay rebalance is disabled, it should maintain required number of 
replicas for that db.
+    // replica for other dbs should not be moved.
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is =
+          
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+
+      if (db.equals(testDb)) {
+        validateMinActiveAndTopStateReplica(idealState, ev, _replica, 
NUM_NODE);
+      } else {
+        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, 
NUM_NODE);
+        validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+            _participants.get(0).getInstanceName(), true);
+      }
+    }
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+  @Override
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+
+    // disable one node, no partition should be moved.
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(100);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE);
+      validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName(), true);
+    }
+
+    // disable delay rebalance for the entire cluster.
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
+    // TODO: remove this once controller is listening on cluster config change.
+    RebalanceScheduler.invokeRebalance(_controller.getHelixDataAccessor(), 
_testDBs.get(0));
+    Thread.sleep(500);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(
+          CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    }
+
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    super.testDisableDelayRebalanceInInstance();
+  }
+
+  @BeforeMethod
+  public void beforeTest() {
+    // restart any participant that has been disconnected from last test.
+    for (int i = 0; i < _participants.size(); i++) {
+      if (!_participants.get(i).isConnected()) {
+        _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+            _participants.get(i).getInstanceName()));
+        _participants.get(i).syncStart();
+      }
+      enableInstance(_participants.get(i).getInstanceName(), true);
+    }
+  }
+
+  private void enableInstance(String instance, boolean enabled) {
+    // Disable one node, no partition should be moved.
+    long currentTime = System.currentTimeMillis();
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instance, enabled);
+    InstanceConfig instanceConfig = 
_configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+    Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled);
+    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime);
+    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 
100);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
new file mode 100644
index 0000000..7d98f27
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
@@ -0,0 +1,124 @@
+package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDelayedAutoRebalanceWithRackaware extends 
TestDelayedAutoRebalance {
+  final int NUM_NODE = 9;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, 
storageNodeName, zone);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
+  @Override
+  protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
+    return createResourceWithDelayedRebalance(clusterName, db, stateModel, 
numPartition, replica,
+        minActiveReplica, delay, CrushRebalanceStrategy.class.getName());
+  }
+
+  @Test
+  public void testDelayedPartitionMovement() throws Exception {
+    super.testDelayedPartitionMovement();
+  }
+
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws 
Exception {
+    super.testDelayedPartitionMovementWithClusterConfigedDelay();
+  }
+
+  /**
+   * Test when two nodes go offline,  the minimal active replica should be 
maintained.
+   * @throws Exception
+   */
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testMinimalActiveReplicaMaintain() throws Exception {
+    super.testMinimalActiveReplicaMaintain();
+  }
+
+  /**
+   * The partititon should be moved to other nodes after the delay time
+   */
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testPartitionMovementAfterDelayTime() throws Exception {
+    super.testPartitionMovementAfterDelayTime();
+  }
+
+  @Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testDisableDelayRebalanceInResource() throws Exception {
+    super.testDisableDelayRebalanceInResource();
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    super.testDisableDelayRebalanceInCluster();
+  }
+
+  @Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+  public void testDisableDelayRebalanceInInstance() throws Exception {
+    super.testDisableDelayRebalanceInInstance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/de38a7db/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java
deleted file mode 100644
index 6551d1e..0000000
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCrushAutoRebalance.java
+++ /dev/null
@@ -1,316 +0,0 @@
-package org.apache.helix.integration.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import 
org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.tools.ClusterSetup;
-import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
-  final int NUM_NODE = 6;
-  protected static final int START_PORT = 12918;
-  protected static final int _PARTITIONS = 20;
-
-  protected final String CLASS_NAME = getShortClassName();
-  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  protected ClusterControllerManager _controller;
-
-  protected ClusterSetup _setupTool = null;
-  List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
-  Map<String, String> _nodeToZoneMap = new HashMap<String, String>();
-  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
-  List<String> _nodes = new ArrayList<String>();
-  Set<String> _allDBs = new HashSet<String>();
-  int _replica = 3;
-
-  String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
-      BuiltInStateModelDefinitions.MasterSlave.name(),
-      BuiltInStateModelDefinitions.LeaderStandby.name()
-  };
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
-
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursive(namespace);
-    }
-    _setupTool = new ClusterSetup(_gZkClient);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-
-    for (int i = 0; i < NUM_NODE; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-      String zone = "zone-" + i % 3;
-      String tag = "tag-" + i % 2;
-      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, 
storageNodeName, zone);
-      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
-      _nodeToZoneMap.put(storageNodeName, zone);
-      _nodeToTagMap.put(storageNodeName, tag);
-      _nodes.add(storageNodeName);
-    }
-
-    // start dummy participants
-    for (String node : _nodes) {
-      MockParticipantManager participant =
-          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
-      participant.syncStart();
-      _participants.add(participant);
-    }
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
-    _controller.syncStart();
-
-    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
-    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
-  }
-
-  @DataProvider(name = "rebalanceStrategies")
-  public static String [][] rebalanceStrategies() {
-    return new String[][] { {"CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName()},
-        {"MultiRoundCrushRebalanceStrategy", 
MultiRoundCrushRebalanceStrategy.class.getName()}
-    };
-  }
-
-  @Test(dataProvider = "rebalanceStrategies", enabled=true)
-  public void testZoneIsolation(String rebalanceStrategyName, String 
rebalanceStrategyClass)
-      throws Exception {
-    System.out.println("testZoneIsolation " + rebalanceStrategyName);
-
-    int i = 0;
-    for (String stateModel : _testModels) {
-      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
-      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
-          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
-      _allDBs.add(db);
-    }
-    Thread.sleep(300);
-
-    HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(_allDBs).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    for (String db : _allDBs) {
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      ExternalView ev =
-          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateZoneAndTagIsolation(is, ev, _replica);
-    }
-  }
-
-  @Test(dataProvider = "rebalanceStrategies", enabled=true)
-  public void testZoneIsolationWithInstanceTag(
-      String rebalanceStrategyName, String rebalanceStrategyClass) throws 
Exception {
-    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
-    int i = 0;
-    for (String tag : tags) {
-      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
-      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
-          BuiltInStateModelDefinitions.MasterSlave.name(), 
RebalanceMode.FULL_AUTO + "",
-          rebalanceStrategyClass);
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      is.setInstanceGroupTag(tag);
-      
_setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
is);
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
-      _allDBs.add(db);
-    }
-    Thread.sleep(300);
-
-    HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(_allDBs).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    for (String db : _allDBs) {
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      ExternalView ev =
-          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateZoneAndTagIsolation(is, ev, _replica);
-    }
-  }
-
-  @Test (dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag"})
-  public void testLackEnoughLiveRacks() throws Exception {
-    System.out.println("TestLackEnoughInstances");
-    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
-
-    // shutdown participants within one zone
-    String zone = _nodeToZoneMap.values().iterator().next();
-    for (int i = 0; i < _participants.size(); i++) {
-      MockParticipantManager p = _participants.get(i);
-      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){
-        p.syncStop();
-      }
-    }
-
-    int i = 0;
-    for (String stateModel : _testModels) {
-      String db = "Test-DB-CrushRebalanceStrategy-" + i++;
-      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
-          RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
-      _allDBs.add(db);
-    }
-    Thread.sleep(300);
-
-    HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(_allDBs).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    for (String db : _allDBs) {
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      ExternalView ev =
-          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateZoneAndTagIsolation(is, ev, 2);
-    }
-  }
-
-  @Test (dependsOnMethods = { "testLackEnoughLiveRacks"})
-  public void testLackEnoughRacks() throws Exception {
-    System.out.println("TestLackEnoughInstances ");
-    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
-
-    // shutdown participants within one zone
-    String zone = _nodeToZoneMap.values().iterator().next();
-    for (int i = 0; i < _participants.size(); i++) {
-      MockParticipantManager p = _participants.get(i);
-      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)){
-        p.syncStop();
-        _setupTool.getClusterManagementTool()
-            .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
-        Thread.sleep(50);
-        _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
-      }
-    }
-
-    int i = 0;
-    for (String stateModel : _testModels) {
-      String db = "Test-DB-CrushRebalanceStrategy-" + i++;
-      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
-          RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
-      _allDBs.add(db);
-    }
-    Thread.sleep(300);
-
-    HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setResources(_allDBs).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    for (String db : _allDBs) {
-      IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      ExternalView ev =
-          
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
-      validateZoneAndTagIsolation(is, ev, 2);
-    }
-  }
-
-  @AfterMethod
-  public void afterMethod() throws Exception {
-    for (String db : _allDBs) {
-      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
-    }
-    _allDBs.clear();
-    // waiting for all DB be dropped.
-    Thread.sleep(100);
-  }
-
-  /**
-   * Validate instances for each partition is on different zone and with 
necessary tagged instances.
-   */
-  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int 
expectedReplica) {
-    String tag = is.getInstanceGroupTag();
-    for (String partition : is.getPartitionSet()) {
-      Set<String> assignedZones = new HashSet<String>();
-
-      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
-      Set<String> instancesInEV = assignmentMap.keySet();
-      // TODO: preference List is not persisted in IS.
-      //Assert.assertEquals(instancesInEV, instancesInIs);
-      for (String instance : instancesInEV) {
-        assignedZones.add(_nodeToZoneMap.get(instance));
-        if (tag != null) {
-          InstanceConfig config =
-              
_setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
-          Assert.assertTrue(config.containsTag(tag));
-        }
-      }
-      Assert.assertEquals(assignedZones.size(), expectedReplica);
-    }
-  }
-
-  @Test()
-  public void testAddZone() throws Exception {
-    //TODO
-  }
-
-  @Test()
-  public void testAddNodes() throws Exception {
-    //TODO
-  }
-
-  @Test()
-  public void testNodeFailure() throws Exception {
-    //TODO
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    /**
-     * shutdown order: 1) disconnect the controller 2) disconnect participants
-     */
-    _controller.syncStop();
-    for (MockParticipantManager participant : _participants) {
-      participant.syncStop();
-    }
-    _setupTool.deleteCluster(CLUSTER_NAME);
-    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
-  }
-}

Reply via email to