This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new 76ef94699 Implement the on-demand rebalance service (#2595)
76ef94699 is described below
commit 76ef9469970f40cc9c0a2c11339cdbe3d853aab9
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Mon Aug 21 09:21:19 2023 -0700
Implement the on-demand rebalance service (#2595)
Final Commit Message:
Implement the on-demand rebalance service that performs rebalancing for
offline or disabled instances without respecting the delay configuration.
---------
Co-authored-by: Xiaxuan Gao <[email protected]>
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 6 +
.../rebalancer/util/DelayedRebalanceUtil.java | 50 ++++++-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 22 ++++
.../java/org/apache/helix/model/ClusterConfig.java | 27 +++-
.../java/org/apache/helix/common/ZkTestBase.java | 8 ++
.../TestDelayedAutoRebalance.java | 144 +++++++++++++++++++++
...stDelayedAutoRebalanceWithDisabledInstance.java | 29 +++--
.../WagedRebalancer/TestDelayedWagedRebalance.java | 15 +++
...tDelayedWagedRebalanceWithDisabledInstance.java | 15 +++
.../java/org/apache/helix/mock/MockHelixAdmin.java | 3 +
.../org/apache/helix/model/TestClusterConfig.java | 22 ++++
11 files changed, 324 insertions(+), 17 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index ab4ba57b6..14863f57e 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -553,6 +553,12 @@ public interface HelixAdmin {
*/
void rebalance(String clusterName, String resourceName, int replica);
+ /**
+ * Rebalance a cluster without respecting the delay
+ * @param clusterName
+ */
+ void onDemandRebalance(String clusterName);
+
/**
* Add ideal state using a json format file
* @param clusterName
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index ee8804749..92556bb40 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -129,15 +129,25 @@ public class DelayedRebalanceUtil {
}
/**
- * @return The time when an offline or disabled instance should be treated
as inactive.
- * Return -1 if it is inactive now.
+ * Return the time when an offline or disabled instance should be treated as
inactive. Return -1
+ * if it is inactive now or forced to be rebalanced by an on-demand
rebalance.
+ *
+ * @return A timestamp that represents the expected inactive time of a node.
*/
private static long getInactiveTime(String instance, Set<String>
liveInstances, Long offlineTime,
long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
long inactiveTime = Long.MAX_VALUE;
+ long lastOnDemandRebalanceTime =
clusterConfig.getLastOnDemandRebalanceTimestamp();
- // check the time instance went offline.
+ // Check if the given instance is offline
if (!liveInstances.contains(instance)) {
+ // Check if the offline instance is forced to be rebalanced by an
on-demand rebalance.
+ // If so, return it as an inactive instance.
+ if (isInstanceForcedToBeRebalanced(offlineTime, delay,
lastOnDemandRebalanceTime)) {
+ return -1L;
+ }
+
+ // Check the time instance went offline.
if (offlineTime != null && offlineTime > 0 && offlineTime + delay <
inactiveTime) {
inactiveTime = offlineTime + delay;
}
@@ -154,6 +164,13 @@ public class DelayedRebalanceUtil {
disabledTime = batchDisableTime;
}
}
+
+ // Check if the disabled instance is forced to be rebalanced by an
on-demand rebalance.
+ // If so, return it as an inactive instance.
+ if (isInstanceForcedToBeRebalanced(disabledTime, delay,
lastOnDemandRebalanceTime)) {
+ return -1L;
+ }
+
if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
inactiveTime = disabledTime + delay;
}
@@ -417,6 +434,33 @@ public class DelayedRebalanceUtil {
currentIdealState), currentIdealState, numReplica);
}
+ /**
+ * Given the offline/disabled time, delay, and the last on-demand rebalance
time, this method checks
+ * if the node associated with the offline/disabled time is forced to be
rebalanced by the on-demand
+ * rebalance.
+ * 1. If either the last on-demand rebalance time or the offline/disabled
time is unavailable, then
+ * the node is not forced to be rebalanced.
+ * 2. If the current time doesn't surpass the delayed offline/disabled time
and the last on-demand
+ * rebalance time is after the offline/disabled time, then the node is
forced to be rebalanced.
+ *
+ * @param offlineOrDisabledTime A unix timestamp indicating the most recent
time when a node went
+ * offline or was disabled.
+ * @param delay The delay window configuration of the current cluster
+ * @param lastOnDemandRebalanceTime A unix timestamp representing the most
recent time when an
+ * on-demand rebalance was triggered.
+ * @return A boolean indicating whether a node is forced to be rebalanced
+ */
+ private static boolean isInstanceForcedToBeRebalanced(Long
offlineOrDisabledTime, long delay,
+ long lastOnDemandRebalanceTime) {
+ if (lastOnDemandRebalanceTime == -1 || offlineOrDisabledTime == null
+ || offlineOrDisabledTime <= 0 || System.currentTimeMillis() >
(offlineOrDisabledTime
+ + delay)) {
+ return false;
+ }
+
+ return offlineOrDisabledTime < lastOnDemandRebalanceTime;
+ }
+
/**
* For the resource in the cluster, find additional AssignableReplica to
close the gap on minActiveReplica.
* @param clusterData Cluster data cache.
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d45f8cdb8..917cb25de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1532,6 +1532,28 @@ public class ZKHelixAdmin implements HelixAdmin {
rebalance(clusterName, resourceName, replica, resourceName, "");
}
+ @Override
+ public void onDemandRebalance(String clusterName) {
+ BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ": cluster config
does not exist");
+ }
+
+ baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ": cluster
config is null");
+ }
+ ClusterConfig clusterConfig = new ClusterConfig(currentData);
+
clusterConfig.setLastOnDemandRebalanceTimestamp(System.currentTimeMillis());
+ return clusterConfig.getRecord();
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
@Override
public void rebalance(String clusterName, String resourceName, int replica,
String keyPrefix,
String group) {
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 8f04c5fab..e33b90204 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -151,7 +151,10 @@ public class ClusterConfig extends HelixProperty {
HELIX_ENABLED_DISABLE_TIMESTAMP,
HELIX_DISABLED_REASON,
// disabled type should be a enum of
org.apache.helix.constants.InstanceConstants.InstanceDisabledType
- HELIX_DISABLED_TYPE
+ HELIX_DISABLED_TYPE,
+
+ // The last time when the on-demand rebalance is triggered.
+ LAST_ON_DEMAND_REBALANCE_TIMESTAMP
}
public enum GlobalRebalancePreferenceKey {
@@ -188,6 +191,7 @@ public class ClusterConfig extends HelixProperty {
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET
= -1;
private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
+ private final static long DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP = -1L;
/**
* Instantiate for a specific cluster
@@ -1173,4 +1177,25 @@ public class ClusterConfig extends HelixProperty {
}
return getDisabledInstances().get(instanceName);
}
+
+ /**
+ * Get a unix time that represents the last time the on-demand rebalance is
triggered on the
+ * current cluster. Return -1 if the configuration doesn't have such record
yet.
+ *
+ * @return the last on-demand rebalance timestamp in a unix format
+ */
+ public long getLastOnDemandRebalanceTimestamp() {
+ return
_record.getLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+ DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP);
+ }
+
+ /**
+ * Set the last on demand rebalance time to be the given timestamp.
+ *
+ * @param rebalanceTimestamp
+ */
+ public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) {
+
_record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+ rebalanceTimestamp);
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 1cea1926a..0218c3ffc 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -356,6 +356,14 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
+ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
+ String clusterName, long lastOnDemandTime) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
String stateModel, int numPartition, int replica, int minActiveReplica,
long delay) {
return createResourceWithDelayedRebalance(clusterName, db, stateModel,
numPartition, replica,
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 958f27c83..5c99819d6 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
@@ -35,6 +36,7 @@ import
org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
@@ -51,6 +53,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
// TODO: remove this wait time once we have a better way to determine if the
rebalance has been
// TODO: done as a reaction of the test operations.
protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME =
TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME;
+ protected static final String OFFLINE_NODE = "offline";
+ protected static final String DISABLED_NODE = "disabled";
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
@@ -61,6 +65,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
protected int _minActiveReplica = _replica - 1;
protected ZkHelixClusterVerifier _clusterVerifier;
protected List<String> _testDBs = new ArrayList<>();
+ protected String _testingCondition;
+ protected ConfigAccessor _configAccessor;
@BeforeClass
public void beforeClass() throws Exception {
@@ -90,6 +96,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
.build();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ _testingCondition = OFFLINE_NODE;
}
protected String[] TestStateModels = {
@@ -233,6 +240,76 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME,
disabledInstanceName, true);
}
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+ public void testOnDemandRebalance() throws Exception {
+ enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+ boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+ if (isDisabled) {
+ // disable one node and make sure no partition movement
+ validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+ } else {
+ // stop one node and make sure no partition movement
+ validateDelayedMovements(externalViewsBefore);
+ }
+
+ // trigger an on-demand rebalance and partitions on the offline/disabled
node should move
+ validateMovementAfterOnDemandRebalance(externalViewsBefore, null,true,
isDisabled);
+
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ }
+
+ @Test(dependsOnMethods = {"testOnDemandRebalance"})
+ public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+ enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+ boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+ if (isDisabled) {
+ // disable one node and make sure no partition movement
+ validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+ } else {
+ // stop one node and make sure no partition movement
+ validateDelayedMovements(externalViewsBefore);
+ }
+
+ // trigger an on-demand rebalance and partitions on the offline/disabled
node shouldn't move
+ // because the last on-demand timestamp is expired.
+ validateMovementAfterOnDemandRebalance(externalViewsBefore, 1L, false,
isDisabled);
+
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ }
+
+ @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+ public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws
Exception {
+ long delay = 4000;
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
+ boolean isDisabled = _testingCondition.equals(DISABLED_NODE);
+ if (isDisabled) {
+ // disable one node and make sure no partition movement
+ validateDelayedMovementsOnDisabledNode(externalViewsBefore);
+ } else {
+ // stop one node and make sure no partition movement
+ validateDelayedMovements(externalViewsBefore);
+ }
+
+ Thread.sleep(delay);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ // after delay time, it should maintain required number of replicas
+ externalViewsBefore = validatePartitionMovement(externalViewsBefore, true,
isDisabled);
+
+ // trigger an on-demand rebalance and partitions on the offline/disabled
node shouldn't move
+ // because the last on-demand timestamp is expired.
+ validateMovementAfterOnDemandRebalance(externalViewsBefore, null,false,
isDisabled);
+
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ }
+
@AfterMethod
public void afterTest() throws InterruptedException {
// delete all DBs create in last test
@@ -304,6 +381,54 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
}
+ protected void validateMovementAfterOnDemandRebalance(
+ Map<String, ExternalView> externalViewsBefore, Long lastOnDemandTime,
boolean isPartitionMoved,
+ boolean isDisabled) {
+ if (lastOnDemandTime == null) {
+ _gSetupTool.getClusterManagementTool().onDemandRebalance(CLUSTER_NAME);
+ } else {
+ setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME,
lastOnDemandTime);
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validatePartitionMovement(externalViewsBefore, isPartitionMoved,
isDisabled);
+ }
+
+ protected Map<String, ExternalView> validatePartitionMovement(
+ Map<String, ExternalView> externalViewsBefore, boolean isPartitionMoved,
boolean isDisabled) {
+ Map<String, ExternalView> externalViewAfter = new HashMap<>();
+ for (String db : _testDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ if (isPartitionMoved) {
+ validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+ validateNoPartitionOnInstance(is, externalViewsBefore.get(db), ev,
+ _participants.get(0).getInstanceName());
+ } else {
+ validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica,
NUM_NODE);
+ validateNoPartitionMove(is, externalViewsBefore.get(db), ev,
+ _participants.get(0).getInstanceName(), isDisabled);
+ }
+ externalViewAfter.put(db, ev);
+ }
+ return externalViewAfter;
+ }
+
+ protected void validateNoPartitionOnInstance(IdealState is, ExternalView
evBefore,
+ ExternalView evAfter, String instanceName) {
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentsBefore =
evBefore.getRecord().getMapField(partition);
+ Map<String, String> assignmentsAfter =
evAfter.getRecord().getMapField(partition);
+ Set<String> instancesAfter = new
HashSet<String>(assignmentsAfter.keySet());
+
+ // the offline/disabled instance shouldn't have a partition assignment
after rebalance
+ Assert.assertFalse(instancesAfter.contains(instanceName), String.format(
+ "%s is still on the instance after rebalance, before: %s, after: %s,
instance: %s",
+ partition, assignmentsBefore.toString(),
assignmentsAfter.toString(), instanceName));
+ }
+ }
+
private void validateDelayedMovements(Map<String, ExternalView>
externalViewsBefore)
throws InterruptedException {
_participants.get(0).syncStop();
@@ -318,6 +443,25 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
}
}
+ protected void enableInstance(String instance, boolean enabled) {
+ // Disable one node, no partition should be moved.
+ long currentTime = System.currentTimeMillis();
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instance, enabled);
+ InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+ Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled);
+ Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime);
+ Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime +
100);
+ }
+
+ protected void validateDelayedMovementsOnDisabledNode(Map<String,
ExternalView> externalViewsBefore)
+ throws Exception {
+ enableInstance(_participants.get(0).getInstanceName(), false);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ validatePartitionMovement(externalViewsBefore, false, true);
+ }
+
@AfterClass
public void afterClass() throws Exception {
if (_clusterVerifier != null) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 3e5eadd0f..7fa1426d6 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -25,7 +25,6 @@ import org.apache.helix.ConfigAccessor;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -33,12 +32,11 @@ import org.testng.annotations.Test;
public class TestDelayedAutoRebalanceWithDisabledInstance extends
TestDelayedAutoRebalance {
- private ConfigAccessor _configAccessor;
-
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
_configAccessor = new ConfigAccessor(_gZkClient);
+ _testingCondition = DISABLED_NODE;
}
@@ -292,6 +290,21 @@ public class TestDelayedAutoRebalanceWithDisabledInstance
extends TestDelayedAut
super.testDisableDelayRebalanceInInstance();
}
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+ public void testOnDemandRebalance() throws Exception {
+ super.testOnDemandRebalance();
+ }
+
+ @Test(dependsOnMethods = {"testOnDemandRebalance"})
+ public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+ super.testExpiredOnDemandRebalanceTimestamp();
+ }
+
+ @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+ public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws
Exception {
+ super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+ }
+
@BeforeMethod
public void beforeTest() {
// restart any participant that has been disconnected from last test.
@@ -304,14 +317,4 @@ public class TestDelayedAutoRebalanceWithDisabledInstance
extends TestDelayedAut
enableInstance(_participants.get(i).getInstanceName(), true);
}
}
-
- private void enableInstance(String instance, boolean enabled) {
- // Disable one node, no partition should be moved.
- long currentTime = System.currentTimeMillis();
- _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instance, enabled);
- InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
- Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled);
- Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime);
- Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime +
100);
- }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
index f2aa3025e..5f806669d 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -86,4 +86,19 @@ public class TestDelayedWagedRebalance extends
TestDelayedAutoRebalance {
public void testDisableDelayRebalanceInInstance() throws Exception {
super.testDisableDelayRebalanceInInstance();
}
+
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+ public void testOnDemandRebalance() throws Exception {
+ super.testOnDemandRebalance();
+ }
+
+ @Test(dependsOnMethods = {"testOnDemandRebalance"})
+ public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+ super.testExpiredOnDemandRebalanceTimestamp();
+ }
+
+ @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+ public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws
Exception {
+ super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
index 70b8adf5f..84d86c4a7 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -98,4 +98,19 @@ public class TestDelayedWagedRebalanceWithDisabledInstance
extends TestDelayedAu
throws Exception {
super.testDisableDelayRebalanceInInstance();
}
+
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"})
+ public void testOnDemandRebalance() throws Exception {
+ super.testOnDemandRebalance();
+ }
+
+ @Test(dependsOnMethods = {"testOnDemandRebalance"})
+ public void testExpiredOnDemandRebalanceTimestamp() throws Exception {
+ super.testExpiredOnDemandRebalanceTimestamp();
+ }
+
+ @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"})
+ public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws
Exception {
+ super.testOnDemandRebalanceAfterDelayRebalanceHappen();
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 0069deac8..81993475b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -448,6 +448,9 @@ public class MockHelixAdmin implements HelixAdmin {
}
+ @Override
+ public void onDemandRebalance(String clusterName) {}
+
@Override public void addIdealState(String clusterName, String resourceName,
String idealStateFile) throws IOException {
diff --git
a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 3690ca447..af794126c 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -397,6 +397,28 @@ public class TestClusterConfig {
trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap);
}
+ @Test
+ public void testGetLastOnDemandRebalanceTimestamp() {
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), -1L);
+
+ testConfig.getRecord()
+
.setLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+ 10000L);
+ Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(),
10000L);
+ }
+
+ @Test
+ public void testSetLastOnDemandRebalanceTimestamp() {
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ testConfig.setLastOnDemandRebalanceTimestamp(10000L);
+
+ Assert.assertEquals(testConfig.getRecord()
+
.getLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(),
+ -1), 10000L);
+ }
+
+
private void trySetInvalidAbnormalStatesResolverMap(ClusterConfig testConfig,
Map<String, String> resolverMap) {
try {