This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit ba858b2a0602a64990f9980b5487256c1f7b4afc Author: Xiaxuan Gao <[email protected]> AuthorDate: Mon Aug 21 09:21:19 2023 -0700 Implement the on-demand rebalance service (#2595) Final Commit Message: Implement the on-demand rebalance service that performs rebalancing for offline or disabled instances without respecting the delay configuration. --------- Co-authored-by: Xiaxuan Gao <[email protected]> --- .../src/main/java/org/apache/helix/HelixAdmin.java | 6 + .../rebalancer/util/DelayedRebalanceUtil.java | 50 ++++++- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 22 ++++ .../java/org/apache/helix/model/ClusterConfig.java | 27 +++- .../java/org/apache/helix/common/ZkTestBase.java | 8 ++ .../TestDelayedAutoRebalance.java | 144 +++++++++++++++++++++ ...stDelayedAutoRebalanceWithDisabledInstance.java | 29 +++-- .../WagedRebalancer/TestDelayedWagedRebalance.java | 15 +++ ...tDelayedWagedRebalanceWithDisabledInstance.java | 15 +++ .../java/org/apache/helix/mock/MockHelixAdmin.java | 3 + .../org/apache/helix/model/TestClusterConfig.java | 22 ++++ 11 files changed, 324 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index ab4ba57b6..14863f57e 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -553,6 +553,12 @@ public interface HelixAdmin { */ void rebalance(String clusterName, String resourceName, int replica); + /** + * Rebalance a cluster without respecting the delay + * @param clusterName + */ + void onDemandRebalance(String clusterName); + /** * Add ideal state using a json format file * @param clusterName diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index ee8804749..92556bb40 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -129,15 +129,25 @@ public class DelayedRebalanceUtil { } /** - * @return The time when an offline or disabled instance should be treated as inactive. - * Return -1 if it is inactive now. + * Return the time when an offline or disabled instance should be treated as inactive. Return -1 + * if it is inactive now or forced to be rebalanced by an on-demand rebalance. + * + * @return A timestamp that represents the expected inactive time of a node. */ private static long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime, long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) { long inactiveTime = Long.MAX_VALUE; + long lastOnDemandRebalanceTime = clusterConfig.getLastOnDemandRebalanceTimestamp(); - // check the time instance went offline. + // Check if the given instance is offline if (!liveInstances.contains(instance)) { + // Check if the offline instance is forced to be rebalanced by an on-demand rebalance. + // If so, return it as an inactive instance. + if (isInstanceForcedToBeRebalanced(offlineTime, delay, lastOnDemandRebalanceTime)) { + return -1L; + } + + // Check the time instance went offline. if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) { inactiveTime = offlineTime + delay; } @@ -154,6 +164,13 @@ public class DelayedRebalanceUtil { disabledTime = batchDisableTime; } } + + // Check if the disabled instance is forced to be rebalanced by an on-demand rebalance. + // If so, return it as an inactive instance. + if (isInstanceForcedToBeRebalanced(disabledTime, delay, lastOnDemandRebalanceTime)) { + return -1L; + } + if (disabledTime > 0 && disabledTime + delay < inactiveTime) { inactiveTime = disabledTime + delay; } @@ -417,6 +434,33 @@ public class DelayedRebalanceUtil { currentIdealState), currentIdealState, numReplica); } + /** + * Given the offline/disabled time, delay, and the last on-demand rebalance time, this method checks + * if the node associated with the offline/disabled time is forced to be rebalanced by the on-demand + * rebalance. + * 1. If either the last on-demand rebalance time or the offline/disabled time is unavailable, then + * the node is not forced to be rebalanced. + * 2. If the current time doesn't surpass the delayed offline/disabled time and the last on-demand + * rebalance time is after the offline/disabled time, then the node is forced to be rebalanced. + * + * @param offlineOrDisabledTime A unix timestamp indicating the most recent time when a node went + * offline or was disabled. + * @param delay The delay window configuration of the current cluster + * @param lastOnDemandRebalanceTime A unix timestamp representing the most recent time when an + * on-demand rebalance was triggered. + * @return A boolean indicating whether a node is forced to be rebalanced + */ + private static boolean isInstanceForcedToBeRebalanced(Long offlineOrDisabledTime, long delay, + long lastOnDemandRebalanceTime) { + if (lastOnDemandRebalanceTime == -1 || offlineOrDisabledTime == null + || offlineOrDisabledTime <= 0 || System.currentTimeMillis() > (offlineOrDisabledTime + + delay)) { + return false; + } + + return offlineOrDisabledTime < lastOnDemandRebalanceTime; + } + /** * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica. * @param clusterData Cluster data cache. diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index d45f8cdb8..917cb25de 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -1532,6 +1532,28 @@ public class ZKHelixAdmin implements HelixAdmin { rebalance(clusterName, resourceName, replica, resourceName, ""); } + @Override + public void onDemandRebalance(String clusterName) { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + String path = PropertyPathBuilder.clusterConfig(clusterName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException("Cluster " + clusterName + ": cluster config does not exist"); + } + + baseAccessor.update(path, new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ": cluster config is null"); + } + ClusterConfig clusterConfig = new ClusterConfig(currentData); + clusterConfig.setLastOnDemandRebalanceTimestamp(System.currentTimeMillis()); + return clusterConfig.getRecord(); + } + }, AccessOption.PERSISTENT); + } + @Override public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, String group) { diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 8f04c5fab..e33b90204 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -151,7 +151,10 @@ public class ClusterConfig extends HelixProperty { HELIX_ENABLED_DISABLE_TIMESTAMP, HELIX_DISABLED_REASON, // disabled type should be a enum of org.apache.helix.constants.InstanceConstants.InstanceDisabledType - HELIX_DISABLED_TYPE + HELIX_DISABLED_TYPE, + + // The last time when the on-demand rebalance is triggered. + LAST_ON_DEMAND_REBALANCE_TIMESTAMP } public enum GlobalRebalancePreferenceKey { @@ -188,6 +191,7 @@ public class ClusterConfig extends HelixProperty { private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1; private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1; private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30; + private final static long DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP = -1L; /** * Instantiate for a specific cluster @@ -1173,4 +1177,25 @@ public class ClusterConfig extends HelixProperty { } return getDisabledInstances().get(instanceName); } + + /** + * Get a unix time that represents the last time the on-demand rebalance is triggered on the + * current cluster. Return -1 if the configuration doesn't have such record yet. + * + * @return the last on-demand rebalance timestamp in a unix format + */ + public long getLastOnDemandRebalanceTimestamp() { + return _record.getLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP); + } + + /** + * Set the last on demand rebalance time to be the given timestamp. + * + * @param rebalanceTimestamp + */ + public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) { + _record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + rebalanceTimestamp); + } } diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 1cea1926a..0218c3ffc 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -356,6 +356,14 @@ public class ZkTestBase { configAccessor.setClusterConfig(clusterName, clusterConfig); } + protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient, + String clusterName, long lastOnDemandTime) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java index 958f27c83..5c99819d6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; @@ -35,6 +36,7 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -51,6 +53,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { // TODO: remove this wait time once we have a better way to determine if the rebalance has been // TODO: done as a reaction of the test operations. protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME; + protected static final String OFFLINE_NODE = "offline"; + protected static final String DISABLED_NODE = "disabled"; protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; @@ -61,6 +65,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { protected int _minActiveReplica = _replica - 1; protected ZkHelixClusterVerifier _clusterVerifier; protected List<String> _testDBs = new ArrayList<>(); + protected String _testingCondition; + protected ConfigAccessor _configAccessor; @BeforeClass public void beforeClass() throws Exception { @@ -90,6 +96,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase { .build(); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + _testingCondition = OFFLINE_NODE; } protected String[] TestStateModels = { @@ -233,6 +240,76 @@ public class TestDelayedAutoRebalance extends ZkTestBase { enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, true); } + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + // trigger an on-demand rebalance and partitions on the offline/disabled node should move + validateMovementAfterOnDemandRebalance(externalViewsBefore, null,true, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + // trigger an on-demand rebalance and partitions on the offline/disabled node shouldn't move + // because the last on-demand timestamp is expired. + validateMovementAfterOnDemandRebalance(externalViewsBefore, 1L, false, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + long delay = 4000; + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay); + Map<String, ExternalView> externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + Thread.sleep(delay); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // after delay time, it should maintain required number of replicas + externalViewsBefore = validatePartitionMovement(externalViewsBefore, true, isDisabled); + + // trigger an on-demand rebalance and partitions on the offline/disabled node shouldn't move + // because the last on-demand timestamp is expired. + validateMovementAfterOnDemandRebalance(externalViewsBefore, null,false, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + @AfterMethod public void afterTest() throws InterruptedException { // delete all DBs create in last test @@ -304,6 +381,54 @@ public class TestDelayedAutoRebalance extends ZkTestBase { } + protected void validateMovementAfterOnDemandRebalance( + Map<String, ExternalView> externalViewsBefore, Long lastOnDemandTime, boolean isPartitionMoved, + boolean isDisabled) { + if (lastOnDemandTime == null) { + _gSetupTool.getClusterManagementTool().onDemandRebalance(CLUSTER_NAME); + } else { + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, lastOnDemandTime); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validatePartitionMovement(externalViewsBefore, isPartitionMoved, isDisabled); + } + + protected Map<String, ExternalView> validatePartitionMovement( + Map<String, ExternalView> externalViewsBefore, boolean isPartitionMoved, boolean isDisabled) { + Map<String, ExternalView> externalViewAfter = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + if (isPartitionMoved) { + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + validateNoPartitionOnInstance(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName()); + } else { + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), isDisabled); + } + externalViewAfter.put(db, ev); + } + return externalViewAfter; + } + + protected void validateNoPartitionOnInstance(IdealState is, ExternalView evBefore, + ExternalView evAfter, String instanceName) { + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition); + Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition); + Set<String> instancesAfter = new HashSet<String>(assignmentsAfter.keySet()); + + // the offline/disabled instance shouldn't have a partition assignment after rebalance + Assert.assertFalse(instancesAfter.contains(instanceName), String.format( + "%s is still on the instance after rebalance, before: %s, after: %s, instance: %s", + partition, assignmentsBefore.toString(), assignmentsAfter.toString(), instanceName)); + } + } + private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore) throws InterruptedException { _participants.get(0).syncStop(); @@ -318,6 +443,25 @@ public class TestDelayedAutoRebalance extends ZkTestBase { } } + protected void enableInstance(String instance, boolean enabled) { + // Disable one node, no partition should be moved. + long currentTime = System.currentTimeMillis(); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); + } + + protected void validateDelayedMovementsOnDisabledNode(Map<String, ExternalView> externalViewsBefore) + throws Exception { + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + validatePartitionMovement(externalViewsBefore, false, true); + } + @AfterClass public void afterClass() throws Exception { if (_clusterVerifier != null) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java index 3e5eadd0f..7fa1426d6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java @@ -25,7 +25,6 @@ import org.apache.helix.ConfigAccessor; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -33,12 +32,11 @@ import org.testng.annotations.Test; public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAutoRebalance { - private ConfigAccessor _configAccessor; - @BeforeClass public void beforeClass() throws Exception { super.beforeClass(); _configAccessor = new ConfigAccessor(_gZkClient); + _testingCondition = DISABLED_NODE; } @@ -292,6 +290,21 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut super.testDisableDelayRebalanceInInstance(); } + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } + @BeforeMethod public void beforeTest() { // restart any participant that has been disconnected from last test. @@ -304,14 +317,4 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut enableInstance(_participants.get(i).getInstanceName(), true); } } - - private void enableInstance(String instance, boolean enabled) { - // Disable one node, no partition should be moved. - long currentTime = System.currentTimeMillis(); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); - InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); - Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); - } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index f2aa3025e..5f806669d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -86,4 +86,19 @@ public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance { public void testDisableDelayRebalanceInInstance() throws Exception { super.testDisableDelayRebalanceInInstance(); } + + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index 70b8adf5f..84d86c4a7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -98,4 +98,19 @@ public class TestDelayedWagedRebalanceWithDisabledInstance extends TestDelayedAu throws Exception { super.testDisableDelayRebalanceInInstance(); } + + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 0069deac8..81993475b 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -448,6 +448,9 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override + public void onDemandRebalance(String clusterName) {} + @Override public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException { diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java index 3690ca447..af794126c 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -397,6 +397,28 @@ public class TestClusterConfig { trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap); } + @Test + public void testGetLastOnDemandRebalanceTimestamp() { + ClusterConfig testConfig = new ClusterConfig("testConfig"); + Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), -1L); + + testConfig.getRecord() + .setLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + 10000L); + Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), 10000L); + } + + @Test + public void testSetLastOnDemandRebalanceTimestamp() { + ClusterConfig testConfig = new ClusterConfig("testConfig"); + testConfig.setLastOnDemandRebalanceTimestamp(10000L); + + Assert.assertEquals(testConfig.getRecord() + .getLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + -1), 10000L); + } + + private void trySetInvalidAbnormalStatesResolverMap(ClusterConfig testConfig, Map<String, String> resolverMap) { try {
