This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new ab1f911ae Fix SWAP_IN InstanceOperation to respect HELIX_ENABLED false 
(#2741)
ab1f911ae is described below

commit ab1f911ae8c688ed6c95daae5bc8d2ebb527020d
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Jan 26 15:38:14 2024 -0800

    Fix SWAP_IN InstanceOperation to respect HELIX_ENABLED false (#2741)
    
    In order to align with the behavior of HELIX_ENABLED = false, we will now 
send all assigned replicas to OFFLINE state. When the SWAP_IN instance is 
re-enabled, it will receive upward state transitions for its replicas.
---
 .../dataproviders/BaseControllerDataProvider.java  | 29 +++++--
 .../rebalancer/DelayedAutoRebalancer.java          |  6 +-
 .../stages/BestPossibleStateCalcStage.java         | 29 ++++++-
 .../rebalancer/TestInstanceOperation.java          | 93 ++++++++++++++++++++++
 4 files changed, 140 insertions(+), 17 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6fe1c7fc3..761fb6974 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -124,7 +124,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new 
HashMap<>();
   private final Map<String, LiveInstance> _assignableLiveInstancesMap = new 
HashMap<>();
   private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = 
new HashMap<>();
-  private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
+  private final Set<String> _liveSwapInInstanceNames = new HashSet<>();
+  private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> 
_abnormalStateResolverMap = new HashMap<>();
   private final Set<String> _timedOutInstanceDuringMaintenance = new 
HashSet<>();
   private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance 
= new HashMap<>();
@@ -365,7 +366,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     _assignableInstanceConfigMap.clear();
     _assignableLiveInstancesMap.clear();
     _swapOutInstanceNameToSwapInInstanceName.clear();
-    _enabledLiveSwapInInstanceNames.clear();
+    _liveSwapInInstanceNames.clear();
+    _enabledSwapInInstanceNames.clear();
 
     Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
     Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
@@ -433,10 +435,12 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       String swapInInstanceName = swapInInstancesByLogicalId.get(value);
       if (swapInInstanceName != null) {
         _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, 
swapInInstanceName);
-        if (liveInstancesMap.containsKey(swapInInstanceName)
-            && 
InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
+        if (liveInstancesMap.containsKey(swapInInstanceName)) {
+          _liveSwapInInstanceNames.add(swapInInstanceName);
+        }
+        if 
(InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
             clusterConfig)) {
-          _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+          _enabledSwapInInstanceNames.add(swapInInstanceName);
         }
       }
     });
@@ -825,12 +829,21 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   }
 
   /**
-   * Get all the enabled and live SWAP_IN instances.
+   * Get all the live SWAP_IN instances.
+   *
+   * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT 
instance.
+   */
+  public Set<String> getLiveSwapInInstanceNames() {
+    return Collections.unmodifiableSet(_liveSwapInInstanceNames);
+  }
+
+  /**
+   * Get all the enabled SWAP_IN instances.
    *
    * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT 
instance.
    */
-  public Set<String> getEnabledLiveSwapInInstanceNames() {
-    return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
+  public Set<String> getEnabledSwapInInstanceNames() {
+    return Collections.unmodifiableSet(_enabledSwapInInstanceNames);
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 78793ddd9..56979d2aa 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -409,11 +409,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
         bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
       for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
         String instanceToDrop = 
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
-        // We do not want to drop a SWAP_IN node if it is at the end of the 
preferenceList,
-        // because partitions are actively being added on this node to prepare 
for SWAP completion.
-        if (cache == null || 
!cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
-          bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
-        }
+        bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
       }
     }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 05652e222..0861a48e8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -135,11 +135,11 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs 
in the cluster.
     Map<String, String> swapOutToSwapInInstancePairs = 
cache.getSwapOutToSwapInInstancePairs();
     // 2. Get all enabled and live SWAP_IN instances in the cluster.
-    Set<String> enabledLiveSwapInInstances = 
cache.getEnabledLiveSwapInInstanceNames();
+    Set<String> liveSwapInInstances = cache.getLiveSwapInInstanceNames();
+    Set<String> enabledSwapInInstances = cache.getEnabledSwapInInstanceNames();
     // 3. For each SWAP_OUT instance in any of the preferenceLists, add the 
corresponding SWAP_IN instance to the end.
-    // Skipping this when there are not SWAP_IN instances ready(enabled and 
live) will reduce computation time when there is not an active
-    // swap occurring.
-    if (!enabledLiveSwapInInstances.isEmpty() && 
!cache.isMaintenanceModeEnabled()) {
+    // Skipping this when there are not SWAP_IN instances that are alive will 
reduce computation time.
+    if (!liveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
       resourceMap.forEach((resourceName, resource) -> {
         StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
         
bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
@@ -148,6 +148,27 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
               commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
 
               commonInstances.forEach(swapOutInstance -> {
+                // If the corresponding swap-in instance is not live, skip 
assigning to it.
+                if (!liveSwapInInstances.contains(
+                    swapOutToSwapInInstancePairs.get(swapOutInstance))) {
+                  return;
+                }
+
+                // If the corresponding swap-in instance is not enabled, 
assign replicas with
+                // initial state.
+                if (!enabledSwapInInstances.contains(
+                    swapOutToSwapInInstancePairs.get(swapOutInstance))) {
+                  
stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+                      stateModelDef.getInitialState());
+                  return;
+                }
+
+                // If the swap-in node is live and enabled, do assignment with 
the following logic:
+                // 1. If the swap-out instance's replica is a topState, set 
the swap-in instance's replica
+                // to the topState if the StateModel allows for another 
replica with the topState to be added.
+                // Otherwise, set the swap-in instance's replica to the 
secondTopState.
+                // 2. If the swap-out instance's replica is a secondTopState, 
set the swap-in instance's replica
+                // to the same secondTopState.
                 if 
(stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
 
                   String topStateCount =
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index f42796fc5..713944e98 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -579,6 +579,97 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = "testNodeSwap")
+  public void testNodeSwapDisableAndReenable() throws Exception {
+    System.out.println(
+        "START TestInstanceOperation.testNodeSwap() at " + new 
Date(System.currentTimeMillis()));
+    removeOfflineOrDisabledOrSwapInInstances();
+
+    // Store original EV
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
+    // and adding the SWAP_IN instance to the cluster.
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Disable the SWAP_IN instance
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+
+    // Check that the SWAP_IN instance's replicas match the SWAP_OUT 
instance's replicas
+    // but all of them are OFFLINE
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+    Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance =
+        getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName);
+    Map<String, Map<String, String>> resourcePartitionStateOnSwapInInstance =
+        getResourcePartitionStateOnInstance(getEVs(), instanceToSwapInName);
+    Assert.assertEquals(
+        resourcePartitionStateOnSwapInInstance.values().stream().flatMap(p -> 
p.keySet().stream())
+            .collect(Collectors.toSet()),
+        resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p -> 
p.keySet().stream())
+            .collect(Collectors.toSet()));
+    Set<String> swapInInstancePartitionStates =
+        resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e -> 
e.values().stream())
+            .collect(Collectors.toSet());
+    Assert.assertEquals(swapInInstancePartitionStates.size(), 1);
+    Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE"));
+
+    // Re-enable the SWAP_IN instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instanceToSwapInName, true);
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is 
not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Validate that the SWAP_IN instance is now in the routing tables.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
+    // swap was completed.
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapDisableAndReenable")
   public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws 
Exception {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
@@ -698,6 +789,8 @@ public class TestInstanceOperation extends ZkTestBase {
     // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT 
InstanceOperation from SWAP_OUT instance.
     _gSetupTool.getClusterManagementTool()
         .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+    // Stop the participant
+    _participants.get(_participants.size() - 1).syncStop();
 
     // Wait for cluster to converge.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());

Reply via email to