This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ab1f911ae Fix SWAP_IN InstanceOperation to respect HELIX_ENABLED false
(#2741)
ab1f911ae is described below
commit ab1f911ae8c688ed6c95daae5bc8d2ebb527020d
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Jan 26 15:38:14 2024 -0800
Fix SWAP_IN InstanceOperation to respect HELIX_ENABLED false (#2741)
In order to align with the behavior of HELIX_ENABLED = false, we will now
send all assigned replicas to OFFLINE state. When the SWAP_IN instance is
re-enabled, it will receive upward state transitions for its replicas.
---
.../dataproviders/BaseControllerDataProvider.java | 29 +++++--
.../rebalancer/DelayedAutoRebalancer.java | 6 +-
.../stages/BestPossibleStateCalcStage.java | 29 ++++++-
.../rebalancer/TestInstanceOperation.java | 93 ++++++++++++++++++++++
4 files changed, 140 insertions(+), 17 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6fe1c7fc3..761fb6974 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -124,7 +124,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new
HashMap<>();
private final Map<String, LiveInstance> _assignableLiveInstancesMap = new
HashMap<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName =
new HashMap<>();
- private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
+ private final Set<String> _liveSwapInInstanceNames = new HashSet<>();
+ private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver>
_abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new
HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance
= new HashMap<>();
@@ -365,7 +366,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
_assignableInstanceConfigMap.clear();
_assignableLiveInstancesMap.clear();
_swapOutInstanceNameToSwapInInstanceName.clear();
- _enabledLiveSwapInInstanceNames.clear();
+ _liveSwapInInstanceNames.clear();
+ _enabledSwapInInstanceNames.clear();
Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
@@ -433,10 +435,12 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
String swapInInstanceName = swapInInstancesByLogicalId.get(value);
if (swapInInstanceName != null) {
_swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName,
swapInInstanceName);
- if (liveInstancesMap.containsKey(swapInInstanceName)
- &&
InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
+ if (liveInstancesMap.containsKey(swapInInstanceName)) {
+ _liveSwapInInstanceNames.add(swapInInstanceName);
+ }
+ if
(InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
clusterConfig)) {
- _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+ _enabledSwapInInstanceNames.add(swapInInstanceName);
}
}
});
@@ -825,12 +829,21 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
}
/**
- * Get all the enabled and live SWAP_IN instances.
+ * Get all the live SWAP_IN instances.
+ *
+ * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT
instance.
+ */
+ public Set<String> getLiveSwapInInstanceNames() {
+ return Collections.unmodifiableSet(_liveSwapInInstanceNames);
+ }
+
+ /**
+ * Get all the enabled SWAP_IN instances.
*
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT
instance.
*/
- public Set<String> getEnabledLiveSwapInInstanceNames() {
- return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
+ public Set<String> getEnabledSwapInInstanceNames() {
+ return Collections.unmodifiableSet(_enabledSwapInInstanceNames);
}
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 78793ddd9..56979d2aa 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -409,11 +409,7 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop =
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
- // We do not want to drop a SWAP_IN node if it is at the end of the
preferenceList,
- // because partitions are actively being added on this node to prepare
for SWAP completion.
- if (cache == null ||
!cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
- bestPossibleStateMap.put(instanceToDrop,
HelixDefinedState.DROPPED.name());
- }
+ bestPossibleStateMap.put(instanceToDrop,
HelixDefinedState.DROPPED.name());
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 05652e222..0861a48e8 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -135,11 +135,11 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs
in the cluster.
Map<String, String> swapOutToSwapInInstancePairs =
cache.getSwapOutToSwapInInstancePairs();
// 2. Get all enabled and live SWAP_IN instances in the cluster.
- Set<String> enabledLiveSwapInInstances =
cache.getEnabledLiveSwapInInstanceNames();
+ Set<String> liveSwapInInstances = cache.getLiveSwapInInstanceNames();
+ Set<String> enabledSwapInInstances = cache.getEnabledSwapInInstanceNames();
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the
corresponding SWAP_IN instance to the end.
- // Skipping this when there are not SWAP_IN instances ready(enabled and
live) will reduce computation time when there is not an active
- // swap occurring.
- if (!enabledLiveSwapInInstances.isEmpty() &&
!cache.isMaintenanceModeEnabled()) {
+ // Skipping this when there are not SWAP_IN instances that are alive will
reduce computation time.
+ if (!liveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
resourceMap.forEach((resourceName, resource) -> {
StateModelDefinition stateModelDef =
cache.getStateModelDef(resource.getStateModelDefRef());
bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
@@ -148,6 +148,27 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
commonInstances.forEach(swapOutInstance -> {
+ // If the corresponding swap-in instance is not live, skip
assigning to it.
+ if (!liveSwapInInstances.contains(
+ swapOutToSwapInInstancePairs.get(swapOutInstance))) {
+ return;
+ }
+
+ // If the corresponding swap-in instance is not enabled,
assign replicas with
+ // initial state.
+ if (!enabledSwapInInstances.contains(
+ swapOutToSwapInInstancePairs.get(swapOutInstance))) {
+
stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+ stateModelDef.getInitialState());
+ return;
+ }
+
+ // If the swap-in node is live and enabled, do assignment with
the following logic:
+ // 1. If the swap-out instance's replica is a topState, set
the swap-in instance's replica
+ // to the topState if the StateModel allows for another
replica with the topState to be added.
+ // Otherwise, set the swap-in instance's replica to the
secondTopState.
+ // 2. If the swap-out instance's replica is a secondTopState,
set the swap-in instance's replica
+ // to the same secondTopState.
if
(stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
String topStateCount =
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index f42796fc5..713944e98 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -579,6 +579,97 @@ public class TestInstanceOperation extends ZkTestBase {
}
@Test(dependsOnMethods = "testNodeSwap")
+ public void testNodeSwapDisableAndReenable() throws Exception {
+ System.out.println(
+ "START TestInstanceOperation.testNodeSwap() at " + new
Date(System.currentTimeMillis()));
+ removeOfflineOrDisabledOrSwapInInstances();
+
+ // Store original EV
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
+ // and adding the SWAP_IN instance to the cluster.
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+ // Disable the SWAP_IN instance
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+
+ // Check that the SWAP_IN instance's replicas match the SWAP_OUT
instance's replicas
+ // but all of them are OFFLINE
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+ Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance =
+ getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName);
+ Map<String, Map<String, String>> resourcePartitionStateOnSwapInInstance =
+ getResourcePartitionStateOnInstance(getEVs(), instanceToSwapInName);
+ Assert.assertEquals(
+ resourcePartitionStateOnSwapInInstance.values().stream().flatMap(p ->
p.keySet().stream())
+ .collect(Collectors.toSet()),
+ resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p ->
p.keySet().stream())
+ .collect(Collectors.toSet()));
+ Set<String> swapInInstancePartitionStates =
+ resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e ->
e.values().stream())
+ .collect(Collectors.toSet());
+ Assert.assertEquals(swapInInstancePartitionStates.size(), 1);
+ Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE"));
+
+ // Re-enable the SWAP_IN instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instanceToSwapInName, true);
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+
+ // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is
not.
+ validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+ validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Validate that the SWAP_IN instance is now in the routing tables.
+ validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
+ // swap was completed.
+ verifier(() -> (validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapDisableAndReenable")
public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws
Exception {
System.out.println(
"START
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
@@ -698,6 +789,8 @@ public class TestInstanceOperation extends ZkTestBase {
// Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT
InstanceOperation from SWAP_OUT instance.
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+ // Stop the participant
+ _participants.get(_participants.size() - 1).syncStop();
// Wait for cluster to converge.
Assert.assertTrue(_clusterVerifier.verifyByPolling());