This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/ApplicationClusterManager by 
this push:
     new 76ef94699 Implement the on-demand rebalance service (#2595)
76ef94699 is described below

commit 76ef9469970f40cc9c0a2c11339cdbe3d853aab9
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Mon Aug 21 09:21:19 2023 -0700

    Implement the on-demand rebalance service (#2595)
    
    Final Commit Message:
    Implement the on-demand rebalance service that performs rebalancing for 
offline or disabled instances without respecting the delay configuration.
    ---------
    
    Co-authored-by: Xiaxuan Gao <[email protected]>
---
 .../src/main/java/org/apache/helix/HelixAdmin.java |   6 +
 .../rebalancer/util/DelayedRebalanceUtil.java      |  50 ++++++-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  22 ++++
 .../java/org/apache/helix/model/ClusterConfig.java |  27 +++-
 .../java/org/apache/helix/common/ZkTestBase.java   |   8 ++
 .../TestDelayedAutoRebalance.java                  | 144 +++++++++++++++++++++
 ...stDelayedAutoRebalanceWithDisabledInstance.java |  29 +++--
 .../WagedRebalancer/TestDelayedWagedRebalance.java |  15 +++
 ...tDelayedWagedRebalanceWithDisabledInstance.java |  15 +++
 .../java/org/apache/helix/mock/MockHelixAdmin.java |   3 +
 .../org/apache/helix/model/TestClusterConfig.java  |  22 ++++
 11 files changed, 324 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index ab4ba57b6..14863f57e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -553,6 +553,12 @@ public interface HelixAdmin {
    */
   void rebalance(String clusterName, String resourceName, int replica);
 
+  /**
+   * Rebalance a cluster without respecting the delay
+   * @param clusterName
+   */
+  void onDemandRebalance(String clusterName);
+
   /**
    * Add ideal state using a json format file
    * @param clusterName
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index ee8804749..92556bb40 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -129,15 +129,25 @@ public class DelayedRebalanceUtil {
   }
 
   /**
-   * @return The time when an offline or disabled instance should be treated 
as inactive.
-   * Return -1 if it is inactive now.
+   * Return the time when an offline or disabled instance should be treated as 
inactive. Return -1
+   * if it is inactive now or forced to be rebalanced by an on-demand 
rebalance.
+   *
+   * @return A timestamp that represents the expected inactive time of a node.
    */
   private static long getInactiveTime(String instance, Set<String> 
liveInstances, Long offlineTime,
       long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
     long inactiveTime = Long.MAX_VALUE;
+    long lastOnDemandRebalanceTime = 
clusterConfig.getLastOnDemandRebalanceTimestamp();
 
-    // check the time instance went offline.
+    // Check if the given instance is offline
     if (!liveInstances.contains(instance)) {
+      // Check if the offline instance is forced to be rebalanced by an 
on-demand rebalance.
+      // If so, return it as an inactive instance.
+      if (isInstanceForcedToBeRebalanced(offlineTime, delay, 
lastOnDemandRebalanceTime)) {
+        return -1L;
+      }
+
+      // Check the time instance went offline.
       if (offlineTime != null && offlineTime > 0 && offlineTime + delay < 
inactiveTime) {
         inactiveTime = offlineTime + delay;
       }
@@ -154,6 +164,13 @@ public class DelayedRebalanceUtil {
           disabledTime = batchDisableTime;
         }
       }
+
+      // Check if the disabled instance is forced to be rebalanced by an 
on-demand rebalance.
+      // If so, return it as an inactive instance.
+      if (isInstanceForcedToBeRebalanced(disabledTime, delay, 
lastOnDemandRebalanceTime)) {
+        return -1L;
+      }
+
       if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
         inactiveTime = disabledTime + delay;
       }
@@ -417,6 +434,33 @@ public class DelayedRebalanceUtil {
             currentIdealState), currentIdealState, numReplica);
   }
 
+  /**
+   * Given the offline/disabled time, delay, and the last on-demand rebalance 
time, this method checks
+   * if the node associated with the offline/disabled time is forced to be 
rebalanced by the on-demand
+   * rebalance.
+   *  1. If either the last on-demand rebalance time or the offline/disabled 
time is unavailable, then
+   *     the node is not forced to be rebalanced.
+   *  2. If the current time doesn't surpass the delayed offline/disabled time 
and the last on-demand
+   *     rebalance time is after the offline/disabled time, then the node is 
forced to be rebalanced.
+   *
+   * @param offlineOrDisabledTime A unix timestamp indicating the most recent 
time when a node went
+   *                              offline or was disabled.
+   * @param delay The delay window configuration of the current cluster
+   * @param lastOnDemandRebalanceTime A unix timestamp representing the most 
recent time when an
+   *                                  on-demand rebalance was triggered.
+   * @return A boolean indicating whether a node is forced to be rebalanced
+   */
+  private static boolean isInstanceForcedToBeRebalanced(Long 
offlineOrDisabledTime, long delay,
+      long lastOnDemandRebalanceTime) {
+    if (lastOnDemandRebalanceTime == -1 || offlineOrDisabledTime == null
+        || offlineOrDisabledTime <= 0 || System.currentTimeMillis() > 
(offlineOrDisabledTime
+        + delay)) {
+      return false;
+    }
+
+    return offlineOrDisabledTime < lastOnDemandRebalanceTime;
+  }
+
   /**
    * For the resource in the cluster, find additional AssignableReplica to 
close the gap on minActiveReplica.
    * @param clusterData Cluster data cache.
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d45f8cdb8..917cb25de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1532,6 +1532,28 @@ public class ZKHelixAdmin implements HelixAdmin {
     rebalance(clusterName, resourceName, replica, resourceName, "");
   }
 
+  @Override
+  public void onDemandRebalance(String clusterName) {
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ": cluster config 
does not exist");
+    }
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ": cluster 
config is null");
+        }
+        ClusterConfig clusterConfig = new ClusterConfig(currentData);
+        
clusterConfig.setLastOnDemandRebalanceTimestamp(System.currentTimeMillis());
+        return clusterConfig.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
   @Override
   public void rebalance(String clusterName, String resourceName, int replica, 
String keyPrefix,
       String group) {
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 8f04c5fab..e33b90204 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -151,7 +151,10 @@ public class ClusterConfig extends HelixProperty {
     HELIX_ENABLED_DISABLE_TIMESTAMP,
     HELIX_DISABLED_REASON,
     // disabled type should be a enum of 
org.apache.helix.constants.InstanceConstants.InstanceDisabledType
-    HELIX_DISABLED_TYPE
+    HELIX_DISABLED_TYPE,
+
+    // The last time when the on-demand rebalance is triggered.
+    LAST_ON_DEMAND_REBALANCE_TIMESTAMP
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -188,6 +191,7 @@ public class ClusterConfig extends HelixProperty {
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET 
= -1;
   private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
+  private final static long DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP = -1L;
 
   /**
    * Instantiate for a specific cluster
@@ -1173,4 +1177,25 @@ public class ClusterConfig extends HelixProperty {
     }
     return getDisabledInstances().get(instanceName);
   }
+
+  /**
+   * Get a unix time that represents the last time the on-demand rebalance is 
triggered on the
+   * current cluster. Return -1 if the configuration doesn't have such record 
yet.
+   *
+   * @return the last on-demand rebalance timestamp in a unix format
+   */
+  public long getLastOnDemandRebalanceTimestamp() {
+    return 
_record.getLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+        DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP);
+  }
+
+  /**
+   * Set the last on demand rebalance time to be the given timestamp.
+   *
+   * @param rebalanceTimestamp
+   */
+  public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) {
+    
_record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+        rebalanceTimestamp);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 1cea1926a..0218c3ffc 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -356,6 +356,14 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
+  protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
+      String clusterName, long lastOnDemandTime) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
   protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
       String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
     return createResourceWithDelayedRebalance(clusterName, db, stateModel, 
numPartition, replica,
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 958f27c83..5c99819d6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
@@ -35,6 +36,7 @@ import 
org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -51,6 +53,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
   // TODO: remove this wait time once we have a better way to determine if the 
rebalance has been
   // TODO: done as a reaction of the test operations.
   protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 
TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME;
+  protected static final String OFFLINE_NODE = "offline";
+  protected static final String DISABLED_NODE = "disabled";
 
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
@@ -61,6 +65,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
   protected int _minActiveReplica = _replica - 1;
   protected ZkHelixClusterVerifier _clusterVerifier;
   protected List<String> _testDBs = new ArrayList<>();
+  protected String _testingCondition;
+  protected ConfigAccessor _configAccessor;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -90,6 +96,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
             .build();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    _testingCondition = OFFLINE_NODE;
   }
 
   protected String[] TestStateModels = {
@@ -233,6 +240,76 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, 
disabledInstanceName, true);
   }
 
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+  public void testOnDemandRebalance() throws Exception {
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+    boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+    if (isDisabled) {
+      // disable one node and make sure no partition movement
+      validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+    } else {
+      // stop one node and make sure no partition movement
+      validateDelayedMovements(externalViewsBefore);
+    }
+
+    // trigger an on-demand rebalance and partitions on the offline/disabled 
node should move
+    validateMovementAfterOnDemandRebalance(externalViewsBefore, null,true, 
isDisabled);
+
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+    setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  @Test(dependsOnMethods = {"testOnDemandRebalance"})
+  public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+    boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+    if (isDisabled) {
+      // disable one node and make sure no partition movement
+      validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+    } else {
+      // stop one node and make sure no partition movement
+      validateDelayedMovements(externalViewsBefore);
+    }
+
+    // trigger an on-demand rebalance and partitions on the offline/disabled 
node shouldn't move
+    // because the last on-demand timestamp is expired.
+    validateMovementAfterOnDemandRebalance(externalViewsBefore, 1L, false, 
isDisabled);
+
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+    setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
+  @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+  public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws 
Exception {
+    long delay = 4000;
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+    Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+    boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+    if (isDisabled) {
+      // disable one node and make sure no partition movement
+      validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+    } else {
+      // stop one node and make sure no partition movement
+      validateDelayedMovements(externalViewsBefore);
+    }
+
+    Thread.sleep(delay);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    // after delay time, it should maintain required number of replicas
+    externalViewsBefore = validatePartitionMovement(externalViewsBefore, true, 
isDisabled);
+
+    // trigger an on-demand rebalance and partitions on the offline/disabled 
node shouldn't move
+    // because the last on-demand timestamp is expired.
+    validateMovementAfterOnDemandRebalance(externalViewsBefore, null,false, 
isDisabled);
+
+    setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+    setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+  }
+
   @AfterMethod
   public void afterTest() throws InterruptedException {
     // delete all DBs create in last test
@@ -304,6 +381,54 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
 
   }
 
+  protected void validateMovementAfterOnDemandRebalance(
+      Map<String, ExternalView> externalViewsBefore, Long lastOnDemandTime, 
boolean isPartitionMoved,
+      boolean isDisabled) {
+    if (lastOnDemandTime == null) {
+      _gSetupTool.getClusterManagementTool().onDemandRebalance(CLUSTER_NAME);
+    } else {
+      setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, 
lastOnDemandTime);
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validatePartitionMovement(externalViewsBefore, isPartitionMoved, 
isDisabled);
+  }
+
+  protected Map<String, ExternalView> validatePartitionMovement(
+      Map<String, ExternalView> externalViewsBefore, boolean isPartitionMoved, 
boolean isDisabled) {
+    Map<String, ExternalView> externalViewAfter = new HashMap<>();
+    for (String db : _testDBs) {
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      if (isPartitionMoved) {
+        validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+        validateNoPartitionOnInstance(is, externalViewsBefore.get(db), ev,
+            _participants.get(0).getInstanceName());
+      } else {
+        validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, 
NUM_NODE);
+        validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+            _participants.get(0).getInstanceName(), isDisabled);
+      }
+      externalViewAfter.put(db, ev);
+    }
+    return externalViewAfter;
+  }
+
+  protected void validateNoPartitionOnInstance(IdealState is, ExternalView 
evBefore,
+      ExternalView evAfter, String instanceName) {
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentsBefore = 
evBefore.getRecord().getMapField(partition);
+      Map<String, String> assignmentsAfter = 
evAfter.getRecord().getMapField(partition);
+      Set<String> instancesAfter = new 
HashSet<String>(assignmentsAfter.keySet());
+
+      // the offline/disabled instance shouldn't have a partition assignment 
after rebalance
+      Assert.assertFalse(instancesAfter.contains(instanceName), String.format(
+          "%s is still on the instance after rebalance, before: %s, after: %s, 
instance: %s",
+          partition, assignmentsBefore.toString(), 
assignmentsAfter.toString(), instanceName));
+    }
+  }
+
   private void validateDelayedMovements(Map<String, ExternalView> 
externalViewsBefore)
       throws InterruptedException {
     _participants.get(0).syncStop();
@@ -318,6 +443,25 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     }
   }
 
+  protected void enableInstance(String instance, boolean enabled) {
+    // Disable one node, no partition should be moved.
+    long currentTime = System.currentTimeMillis();
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instance, enabled);
+    InstanceConfig instanceConfig = 
_configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+    Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled);
+    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime);
+    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 
100);
+  }
+
+  protected void validateDelayedMovementsOnDisabledNode(Map<String, 
ExternalView> externalViewsBefore)
+      throws Exception {
+    enableInstance(_participants.get(0).getInstanceName(), false);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    validatePartitionMovement(externalViewsBefore, false, true);
+  }
+
   @AfterClass
   public void afterClass() throws Exception {
     if (_clusterVerifier != null) {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 3e5eadd0f..7fa1426d6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -25,7 +25,6 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -33,12 +32,11 @@ import org.testng.annotations.Test;
 
 
 public class TestDelayedAutoRebalanceWithDisabledInstance extends 
TestDelayedAutoRebalance {
-  private ConfigAccessor _configAccessor;
-
   @BeforeClass
   public void beforeClass() throws Exception {
     super.beforeClass();
     _configAccessor = new ConfigAccessor(_gZkClient);
+    _testingCondition = DISABLED_NODE;
   }
 
 
@@ -292,6 +290,21 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
     super.testDisableDelayRebalanceInInstance();
   }
 
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+  public void testOnDemandRebalance() throws Exception {
+    super.testOnDemandRebalance();
+  }
+
+  @Test(dependsOnMethods = {"testOnDemandRebalance"})
+  public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+    super.testExpiredOnDemandRebalanceTimestamp();
+  }
+
+  @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+  public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws 
Exception {
+    super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+  }
+
   @BeforeMethod
   public void beforeTest() {
     // restart any participant that has been disconnected from last test.
@@ -304,14 +317,4 @@ public class TestDelayedAutoRebalanceWithDisabledInstance 
extends TestDelayedAut
       enableInstance(_participants.get(i).getInstanceName(), true);
     }
   }
-
-  private void enableInstance(String instance, boolean enabled) {
-    // Disable one node, no partition should be moved.
-    long currentTime = System.currentTimeMillis();
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instance, enabled);
-    InstanceConfig instanceConfig = 
_configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
-    Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled);
-    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime);
-    Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 
100);
-  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
index f2aa3025e..5f806669d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -86,4 +86,19 @@ public class TestDelayedWagedRebalance extends 
TestDelayedAutoRebalance {
   public void testDisableDelayRebalanceInInstance() throws Exception {
     super.testDisableDelayRebalanceInInstance();
   }
+
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+  public void testOnDemandRebalance() throws Exception {
+    super.testOnDemandRebalance();
+  }
+
+  @Test(dependsOnMethods = {"testOnDemandRebalance"})
+  public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+    super.testExpiredOnDemandRebalanceTimestamp();
+  }
+
+  @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+  public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws 
Exception {
+    super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
index 70b8adf5f..84d86c4a7 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -98,4 +98,19 @@ public class TestDelayedWagedRebalanceWithDisabledInstance 
extends TestDelayedAu
       throws Exception {
     super.testDisableDelayRebalanceInInstance();
   }
+
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+  public void testOnDemandRebalance() throws Exception {
+    super.testOnDemandRebalance();
+  }
+
+  @Test(dependsOnMethods = {"testOnDemandRebalance"})
+  public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+    super.testExpiredOnDemandRebalanceTimestamp();
+  }
+
+  @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+  public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws 
Exception {
+    super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 0069deac8..81993475b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -448,6 +448,9 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
+  @Override
+  public void onDemandRebalance(String clusterName) {}
+
   @Override public void addIdealState(String clusterName, String resourceName,
       String idealStateFile) throws IOException {
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java 
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 3690ca447..af794126c 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -397,6 +397,28 @@ public class TestClusterConfig {
     trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);
   }
 
+  @Test
+  public void testGetLastOnDemandRebalanceTimestamp() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), -1L);
+
+    testConfig.getRecord()
+        
.setLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+            10000L);
+    Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), 
10000L);
+  }
+
+  @Test
+  public void testSetLastOnDemandRebalanceTimestamp() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    testConfig.setLastOnDemandRebalanceTimestamp(10000L);
+
+    Assert.assertEquals(testConfig.getRecord()
+        
.getLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+            -1), 10000L);
+  }
+
+
   private void trySetInvalidAbnormalStatesResolverMap(ClusterConfig testConfig,
       Map<String, String> resolverMap) {
     try {

Reply via email to