This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8cfe977b5905b2199f8ce77238bf896816524277
Author: Zachary Pinto <[email protected]>
AuthorDate: Tue Dec 19 16:37:19 2023 -0700

    Stabilize TestInstanceOperation (#2715)
    
    * Stabilize TestInstanceOperation. clusterVerifier is evaluating to true 
once the partitionAssignment matches the expected value; however, it is before 
the TopState is transfered back to the SWAP_IN node. This can be fixed by using 
TestHelper.verify to check that states converge within TIMEOUT.
    
    * Moved evacuate tests with long ST resources to the end because it was 
taking long time to drop DBs which was causing flakyness in later tests. Ran 
TestInstanceOperation 5 times locally with success.
---
 .../rebalancer/TestInstanceOperation.java          | 630 +++++++++++----------
 1 file changed, 326 insertions(+), 304 deletions(-)

diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 7cd08d86f..bf6db2900 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -52,6 +52,8 @@ import org.apache.helix.spectator.RoutingTableProvider;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -59,9 +61,12 @@ import org.testng.annotations.Test;
 
 
 public class TestInstanceOperation extends ZkTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+  public static final int TIMEOUT = 10000;
   private final int ZONE_COUNT = 4;
-  protected final int NUM_NODE = 10;
+  protected final int START_NUM_NODE = 10;
   protected static final int START_PORT = 12918;
+  private static int _nextStartPort = START_PORT;
   protected static final int PARTITIONS = 20;
 
   protected final String CLASS_NAME = getShortClassName();
@@ -85,7 +90,6 @@ public class TestInstanceOperation extends ZkTestBase {
   private RoutingTableProvider _routingTableProviderEV;
   private RoutingTableProvider _routingTableProviderCS;
   List<MockParticipantManager> _participants = new ArrayList<>();
-  private List<String> _originalParticipantNames = new ArrayList<>();
   List<String> _participantNames = new ArrayList<>();
   private Set<String> _allDBs = new HashSet<>();
   private ZkHelixClusterVerifier _clusterVerifier;
@@ -104,9 +108,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     _gSetupTool.addCluster(CLUSTER_NAME, true);
 
-    for (int i = 0; i < NUM_NODE; i++) {
-      String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _originalParticipantNames.add(participantName);
+    for (int i = 0; i < START_NUM_NODE; i++) {
+      String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
       addParticipant(participantName);
     }
 
@@ -171,7 +174,6 @@ public class TestInstanceOperation extends ZkTestBase {
     clusterConfig.setDelayRebalaceEnabled(true);
     clusterConfig.setRebalanceDelayTime(1800000L);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    enabledTopologyAwareRebalance();
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
@@ -196,34 +198,23 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
-  private void resetInstances() {
-    // Disable and drop any participants that are not in the original 
participant list.
-    Set<String> droppedParticipants = new HashSet<>();
+  private void removeOfflineOrDisabledOrSwapInInstances() {
+    // Remove all instances that are not live, disabled, or in SWAP_IN state.
     for (int i = 0; i < _participants.size(); i++) {
       String participantName = _participantNames.get(i);
-      if (!_originalParticipantNames.contains(participantName)) {
-        _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
participantName, false);
-        _participants.get(i).syncStop();
-        _gSetupTool.getClusterManagementTool()
-            .dropInstance(CLUSTER_NAME, 
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
participantName));
-        droppedParticipants.add(participantName);
-      }
-    }
-
-    // Remove the dropped instance from _participants and _participantNames
-    _participantNames.removeIf(droppedParticipants::contains);
-    _participants.removeIf(p -> 
droppedParticipants.contains(p.getInstanceName()));
-
-    for (int i = 0; i < _participants.size(); i++) {
-      // If instance is not connected to ZK, replace it
-      if (!_participants.get(i).isConnected()) {
-        // Replace the stopped participant with a new one and inherit the old 
instance config.
-        _participants.set(i, createParticipant(_participantNames.get(i)));
-        _participants.get(i).syncStart();
+      InstanceConfig instanceConfig =
+          
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
participantName);
+      if (!_participants.get(i).isConnected() || 
!instanceConfig.getInstanceEnabled()
+          || instanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+        if (_participants.get(i).isConnected()) {
+          _participants.get(i).syncStop();
+        }
+        _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, 
instanceConfig);
+        _participantNames.remove(i);
+        _participants.remove(i);
+        i--;
       }
-      _gSetupTool.getClusterManagementTool()
-          .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
-      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
_participantNames.get(i), true);
     }
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -328,200 +319,36 @@ public class TestInstanceOperation extends ZkTestBase {
     }
   }
 
-  @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
-  public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
-    System.out.println("START 
TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new 
Date(System.currentTimeMillis()));
-    // add a resource where downward state transition is slow
-    createResourceWithDelayedRebalance(CLUSTER_NAME, 
"TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
-        REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
-    _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
-    // add a resource where downward state transition is slow
-    createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", 
"MasterSlave",
-        PARTITIONS, REPLICA, REPLICA - 1);
-    _allDBs.add("TEST_DB4_DELAYED_WAGED");
-    // wait for assignment to finish
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    // set bootstrap ST delay to a large number
-    _stateModelDelay = -10000L;
-    // evacuate an instance
-    String instanceToEvacuate = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.EVACUATE);
-    // Messages should be pending at all instances besides the evacuate one
-    for (String participant : _participantNames) {
-      if (participant.equals(instanceToEvacuate)) {
-        continue;
-      }
-      TestHelper.verify(
-          () -> 
((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()),
 30000);
-    }
-    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, 
instanceToEvacuate));
-    Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, 
instanceToEvacuate));
-
-    // sleep a bit so ST messages can start executing
-    Thread.sleep(Math.abs(_stateModelDelay / 100));
-    // before we cancel, check current EV
-    Map<String, ExternalView> assignment = getEVs();
-    for (String resource : _allDBs) {
-      // check every replica has >= 3 partitions and a top state partition
-      validateAssignmentInEv(assignment.get(resource));
-    }
-
-    // cancel the evacuation
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
-
-    assignment = getEVs();
-    for (String resource : _allDBs) {
-      // check every replica has >= 3 active replicas, even before cluster 
converge
-      validateAssignmentInEv(assignment.get(resource));
-    }
-
-    // check cluster converge. We have longer delay for ST then verifier 
timeout. It will only converge if we cancel ST.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    // EV should contain all participants, check resources one by one
-    assignment = getEVs();
-    for (String resource : _allDBs) {
-      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
-      // check every replica has >= 3 active replicas again
-      validateAssignmentInEv(assignment.get(resource));
-    }
-  }
-
-  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish")
-  public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
-    System.out.println("START 
TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new 
Date(System.currentTimeMillis()));
-
-    // set DROP ST delay to a large number
-    _stateModelDelay = 10000L;
-
-    // evacuate an instance
-    String instanceToEvacuate = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.EVACUATE);
-
-    // message should be pending at the to evacuate participant
-    TestHelper.verify(
-        () -> 
((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()),
 30000);
-    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, 
instanceToEvacuate));
-
-    // cancel evacuation
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
-    // check every replica has >= 3 active replicas, even before cluster 
converge
-    Map<String, ExternalView> assignment = getEVs();
-    for (String resource : _allDBs) {
-      validateAssignmentInEv(assignment.get(resource));
-    }
-
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    // EV should contain all participants, check resources one by one
-    assignment = getEVs();
-    for (String resource : _allDBs) {
-      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
-      // check every replica has >= 3 active replicas
-      validateAssignmentInEv(assignment.get(resource));
-    }
-  }
-
-  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish")
-  public void testMarkEvacuationAfterEMM() throws Exception {
-    System.out.println("START 
TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new 
Date(System.currentTimeMillis()));
-    _stateModelDelay = 1000L;
-    
Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME));
-    
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
 true, null,
-        null);
-    addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
-
-
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-    Map<String, ExternalView> assignment = getEVs();
-    for (String resource : _allDBs) {
-      
Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE)));
-    }
-
-    // set evacuate operation
-    String instanceToEvacuate = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, 
InstanceConstants.InstanceOperation.EVACUATE);
-
-    // there should be no evacuation happening
-    for (String resource : _allDBs) {
-      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
-    }
-
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    // exit MM
-    
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
 false, null,
-        null);
-
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    assignment = getEVs();
-    List<String> currentActiveInstances =
-        _participantNames.stream().filter(n -> 
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
-    for (String resource : _allDBs) {
-      validateAssignmentInEv(assignment.get(resource));
-      Set<String> newPAssignedParticipants = 
getParticipantsInEv(assignment.get(resource));
-      
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
-      
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
-    }
-    Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, 
instanceToEvacuate));
-
-    _stateModelDelay = 3L;
-  }
-
-  @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
-  public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
-    System.out.println("START 
TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new 
Date(System.currentTimeMillis()));
-    _participants.get(1).syncStop();
-    _participants.get(2).syncStop();
-
-    String evacuateInstanceName =  
_participants.get(_participants.size()-2).getInstanceName();
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, 
InstanceConstants.InstanceOperation.EVACUATE);
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testAddingNodeWithEvacuationTag")
+  public void testNodeSwapNoTopologySetup() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
+        System.currentTimeMillis()));
+    removeOfflineOrDisabledOrSwapInInstances();
 
-    Map<String, ExternalView> assignment;
-    // EV should contain all participants, check resources one by one
-    assignment = getEVs();
-    for (String resource : _allDBs) {
-      TestHelper.verify(() -> {
-        ExternalView ev = assignment.get(resource);
-        for (String partition : ev.getPartitionSet()) {
-          AtomicInteger activeReplicaCount = new AtomicInteger();
-          ev.getStateMap(partition)
-              .values()
-              .stream()
-              .filter(v -> v.equals("MASTER") || v.equals("LEADER") || 
v.equals("SLAVE") || v.equals("FOLLOWER")
-                  || v.equals("STANDBY"))
-              .forEach(v -> activeReplicaCount.getAndIncrement());
-          if (activeReplicaCount.get() < REPLICA - 1 || 
(ev.getStateMap(partition).containsKey(evacuateInstanceName)
-              && 
ev.getStateMap(partition).get(evacuateInstanceName).equals("MASTER") && 
ev.getStateMap(partition)
-              .get(evacuateInstanceName)
-              .equals("LEADER"))) {
-            return false;
-          }
-        }
-        return true;
-      }, 30000);
-    }
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    resetInstances();
-    dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", 
"TEST_DB4_DELAYED_WAGED"));
+    // Add instance with InstanceOperation set to SWAP_IN
+    // There should be an error that the logicalId does not have SWAP_OUT 
instance because,
+    // helix can't determine what topology key to use to get the logicalId if 
TOPOLOGY is not set.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testEvacuationWithOfflineInstancesInCluster")
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapNoTopologySetup")
   public void testAddingNodeWithSwapOutInstanceOperation() throws Exception {
     System.out.println(
         "START 
TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new 
Date(
             System.currentTimeMillis()));
 
     enabledTopologyAwareRebalance();
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -531,7 +358,7 @@ public class TestInstanceOperation extends ZkTestBase {
         InstanceConstants.InstanceOperation.SWAP_OUT);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
         InstanceConstants.InstanceOperation.SWAP_OUT, true, -1);
@@ -543,7 +370,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START 
TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at "
             + new Date(System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to null
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -553,7 +380,7 @@ public class TestInstanceOperation extends ZkTestBase {
         .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
         InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
@@ -564,10 +391,10 @@ public class TestInstanceOperation extends ZkTestBase {
     System.out.println("START 
TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date(
         System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Add new instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, "1000", "zone_1000",
         InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
   }
@@ -578,7 +405,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START 
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at "
             + new Date(System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -590,9 +417,14 @@ public class TestInstanceOperation extends ZkTestBase {
     // Add instance with same logicalId with InstanceOperation unset
     // This should work because adding instance with InstanceOperation unset 
will automatically
     // set the InstanceOperation to SWAP_IN.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
true, -1);
+
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapSwapInNodeNoInstanceOperationEnabled")
@@ -601,7 +433,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START 
TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at "
             + new Date(System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -611,15 +443,14 @@ public class TestInstanceOperation extends ZkTestBase {
         InstanceConstants.InstanceOperation.SWAP_OUT);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
         InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
 
     // Add another instance with InstanceOperation set to SWAP_IN with same 
logicalId as previously
     // added SWAP_IN instance.
-    String secondInstanceToSwapInName =
-        PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size());
+    String secondInstanceToSwapInName = PARTICIPANT_PREFIX + "_" + 
_nextStartPort;
     addParticipant(secondInstanceToSwapInName,
         instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
@@ -627,35 +458,10 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapSwapInNodeWithAlreadySwappingPair")
-  public void testNodeSwapNoTopologySetup() throws Exception {
-    System.out.println("START 
TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
-        System.currentTimeMillis()));
-    disableTopologyAwareRebalance();
-    resetInstances();
-
-    // Set instance's InstanceOperation to SWAP_OUT
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    // There should be an error that the logicalId does not have SWAP_OUT 
instance because,
-    // helix can't determine what topology key to use to get the logicalId if 
TOPOLOGY is not set.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
-    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
-  }
-
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapNoTopologySetup")
   public void testNodeSwapWrongFaultZone() throws Exception {
     System.out.println("START 
TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date(
         System.currentTimeMillis()));
-    // Re-enable topology aware rebalancing and set TOPOLOGY.
-    enabledTopologyAwareRebalance();
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -664,7 +470,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Add instance with InstanceOperation set to SWAP_IN
     // There should be an error because SWAP_IN instance must be in the same 
FAULT_ZONE as the SWAP_OUT instance.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
@@ -676,7 +482,7 @@ public class TestInstanceOperation extends ZkTestBase {
   public void testNodeSwapWrongCapacity() throws Exception {
     System.out.println("START 
TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date(
         System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -685,7 +491,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Add instance with InstanceOperation set to SWAP_IN
     // There should be an error because SWAP_IN instance must have same 
capacity as the SWAP_OUT node.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
@@ -697,7 +503,7 @@ public class TestInstanceOperation extends ZkTestBase {
   public void testNodeSwap() throws Exception {
     System.out.println(
         "START TestInstanceOperation.testNodeSwap() at " + new 
Date(System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
@@ -717,7 +523,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), Collections.emptySet());
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
@@ -752,13 +558,11 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
-        0);
 
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
     // swap was completed.
-    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
-        Collections.emptySet(), Set.of(instanceToSwapInName));
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
   }
 
   @Test(dependsOnMethods = "testNodeSwap")
@@ -767,7 +571,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START 
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
             + new Date(System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
@@ -787,7 +591,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), Collections.emptySet());
 
     // Add instance with InstanceOperation unset, should automatically be set 
to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
@@ -823,13 +627,11 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
-        0);
 
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
     // swap was completed.
-    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
-        Collections.emptySet(), Set.of(instanceToSwapInName));
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
   }
 
   @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled")
@@ -838,7 +640,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START 
TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new 
Date(
             System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
@@ -853,12 +655,12 @@ public class TestInstanceOperation extends ZkTestBase {
         InstanceConstants.InstanceOperation.SWAP_OUT);
 
     // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
-    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
@@ -903,12 +705,9 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Validate there are no partitions on the SWAP_IN instance.
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapInName).size(), 0);
-
     // Validate that the SWAP_OUT instance has the same partitions as it had 
before.
-    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
-        Collections.emptySet(), Collections.emptySet());
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet())), TIMEOUT);
   }
 
   @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete")
@@ -916,7 +715,7 @@ public class TestInstanceOperation extends ZkTestBase {
     System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at 
" + new Date(
         System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
@@ -940,7 +739,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), Collections.emptySet());
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
@@ -983,13 +782,11 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
-        0);
 
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
     // swap was completed.
-    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
-        Collections.emptySet(), Set.of(instanceToSwapInName));
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
   }
 
   @Test(dependsOnMethods = "testNodeSwapAfterEMM")
@@ -998,7 +795,7 @@ public class TestInstanceOperation extends ZkTestBase {
         "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() 
at " + new Date(
             System.currentTimeMillis()));
 
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
@@ -1026,7 +823,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE"));
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
         InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
@@ -1067,8 +864,10 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
-        0);
+
+    verifier(
+        () -> (getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).isEmpty()),
+        TIMEOUT);
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapWithSwapOutInstanceDisabled")
@@ -1076,7 +875,7 @@ public class TestInstanceOperation extends ZkTestBase {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
             + new Date(System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Get the SWAP_OUT instance.
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -1084,7 +883,7 @@ public class TestInstanceOperation extends ZkTestBase {
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
 
     // Add instance with InstanceOperation set to SWAP_IN enabled before 
setting SWAP_OUT instance.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
true, -1);
   }
@@ -1094,7 +893,7 @@ public class TestInstanceOperation extends ZkTestBase {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
             + new Date(System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Get the SWAP_OUT instance.
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -1102,7 +901,7 @@ public class TestInstanceOperation extends ZkTestBase {
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
 
@@ -1117,7 +916,7 @@ public class TestInstanceOperation extends ZkTestBase {
     System.out.println(
         "START 
TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
 at "
             + new Date(System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Get the SWAP_OUT instance.
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
@@ -1125,7 +924,7 @@ public class TestInstanceOperation extends ZkTestBase {
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
 
@@ -1139,10 +938,10 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = 
"testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut")
-  public void testNodeSwapAddSwapInFirst() {
+  public void testNodeSwapAddSwapInFirst() throws Exception {
     System.out.println("START 
TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date(
         System.currentTimeMillis()));
-    resetInstances();
+    removeOfflineOrDisabledOrSwapInInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
@@ -1155,13 +954,13 @@ public class TestInstanceOperation extends ZkTestBase {
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
 
     // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
 
     // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
@@ -1201,13 +1000,234 @@ public class TestInstanceOperation extends ZkTestBase {
     // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
-    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
-        0);
 
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
     // swap was completed.
-    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
-        Collections.emptySet(), Set.of(instanceToSwapInName));
+    verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName))), TIMEOUT);
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapAddSwapInFirst")
+  public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new 
Date(
+            System.currentTimeMillis()));
+    removeOfflineOrDisabledOrSwapInInstances();
+
+    // add a resource where downward state transition is slow
+    createResourceWithDelayedRebalance(CLUSTER_NAME, 
"TEST_DB3_DELAYED_CRUSHED", "MasterSlave",
+        PARTITIONS, REPLICA, REPLICA - 1, 200000, 
CrushEdRebalanceStrategy.class.getName());
+    _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
+    // add a resource where downward state transition is slow
+    createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", 
"MasterSlave",
+        PARTITIONS, REPLICA, REPLICA - 1);
+    _allDBs.add("TEST_DB4_DELAYED_WAGED");
+    // wait for assignment to finish
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // set bootstrap ST delay to a large number
+    _stateModelDelay = -10000L;
+    // evacuate an instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToEvacuate,
+        InstanceConstants.InstanceOperation.EVACUATE);
+    // Messages should be pending at all instances besides the evacuate one
+    for (String participant : _participantNames) {
+      if (participant.equals(instanceToEvacuate)) {
+        continue;
+      }
+      verifier(() -> ((_dataAccessor.getChildNames(
+          _dataAccessor.keyBuilder().messages(participant))).isEmpty()), 
30000);
+    }
+    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, 
instanceToEvacuate));
+    Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, 
instanceToEvacuate));
+
+    // sleep a bit so ST messages can start executing
+    Thread.sleep(Math.abs(_stateModelDelay / 100));
+    // before we cancel, check current EV
+    Map<String, ExternalView> assignment = getEVs();
+    for (String resource : _allDBs) {
+      // check every replica has >= 3 partitions and a top state partition
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    // cancel the evacuation
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+
+    assignment = getEVs();
+    for (String resource : _allDBs) {
+      // check every replica has >= 3 active replicas, even before cluster 
converge
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    // check cluster converge. We have longer delay for ST then verifier 
timeout. It will only converge if we cancel ST.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // EV should contain all participants, check resources one by one
+    assignment = getEVs();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(
+          
getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+      // check every replica has >= 3 active replicas again
+      validateAssignmentInEv(assignment.get(resource));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish")
+  public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
+    System.out.println(
+        "START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() 
at " + new Date(
+            System.currentTimeMillis()));
+
+    // set DROP ST delay to a large number
+    _stateModelDelay = 10000L;
+
+    // evacuate an instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToEvacuate,
+        InstanceConstants.InstanceOperation.EVACUATE);
+
+    // message should be pending at the to evacuate participant
+    verifier(() -> ((_dataAccessor.getChildNames(
+        _dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 
30000);
+    Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, 
instanceToEvacuate));
+
+    // cancel evacuation
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+    // check every replica has >= 3 active replicas, even before cluster 
converge
+    Map<String, ExternalView> assignment = getEVs();
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // EV should contain all participants, check resources one by one
+    assignment = getEVs();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(
+          
getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+      // check every replica has >= 3 active replicas
+      validateAssignmentInEv(assignment.get(resource));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish")
+  public void testMarkEvacuationAfterEMM() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date(
+        System.currentTimeMillis()));
+    _stateModelDelay = 1000L;
+    
Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME));
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
+    String newParticipantName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+    addParticipant(newParticipantName);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Map<String, ExternalView> assignment = getEVs();
+    for (String resource : _allDBs) {
+      Assert.assertFalse(
+          
getParticipantsInEv(assignment.get(resource)).contains(newParticipantName));
+    }
+
+    // set evacuate operation
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToEvacuate,
+        InstanceConstants.InstanceOperation.EVACUATE);
+
+    // there should be no evacuation happening
+    for (String resource : _allDBs) {
+      
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // exit MM
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    assignment = getEVs();
+    List<String> currentActiveInstances =
+        _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate))
+            .collect(Collectors.toList());
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+      Set<String> newPAssignedParticipants = 
getParticipantsInEv(assignment.get(resource));
+      
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+      
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+    }
+    Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, 
instanceToEvacuate));
+
+    _stateModelDelay = 3L;
+  }
+
+  @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
+  public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new 
Date(
+            System.currentTimeMillis()));
+    _participants.get(1).syncStop();
+    _participants.get(2).syncStop();
+
+    String evacuateInstanceName = _participants.get(_participants.size() - 
2).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
evacuateInstanceName,
+        InstanceConstants.InstanceOperation.EVACUATE);
+
+    Map<String, ExternalView> assignment;
+    // EV should contain all participants, check resources one by one
+    assignment = getEVs();
+    for (String resource : _allDBs) {
+      verifier(() -> {
+        ExternalView ev = assignment.get(resource);
+        for (String partition : ev.getPartitionSet()) {
+          AtomicInteger activeReplicaCount = new AtomicInteger();
+          ev.getStateMap(partition).values().stream().filter(
+                  v -> v.equals("MASTER") || v.equals("LEADER") || 
v.equals("SLAVE") || v.equals(
+                      "FOLLOWER") || v.equals("STANDBY"))
+              .forEach(v -> activeReplicaCount.getAndIncrement());
+          if (activeReplicaCount.get() < REPLICA - 1 || (
+              ev.getStateMap(partition).containsKey(evacuateInstanceName) && 
ev.getStateMap(
+                  partition).get(evacuateInstanceName).equals("MASTER") && 
ev.getStateMap(partition)
+                  .get(evacuateInstanceName).equals("LEADER"))) {
+            return false;
+          }
+        }
+        return true;
+      }, 30000);
+    }
+
+    removeOfflineOrDisabledOrSwapInInstances();
+    addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
+    addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
+    dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", 
"TEST_DB4_DELAYED_WAGED"));
+  }
+
+  /**
+   * Verifies that the given verifier returns true within the given timeout. 
Handles AssertionError
+   * by returning false, which TestHelper.verify will not do. Asserts that 
return value from
+   * TestHelper.verify is true.
+   *
+   * @param verifier the verifier to run
+   * @param timeout  the timeout to wait for the verifier to return true
+   * @throws Exception if TestHelper.verify throws an exception
+   */
+  private static void verifier(TestHelper.Verifier verifier, long timeout) 
throws Exception {
+    Assert.assertTrue(TestHelper.verify(() -> {
+      try {
+        boolean result = verifier.verify();
+        if (!result) {
+          LOG.error("Verifier returned false, retrying...");
+        }
+        return result;
+      } catch (AssertionError e) {
+        LOG.error("Caught AssertionError on verifier attempt: ", e);
+        return false;
+      }
+    }, timeout));
   }
 
   private MockParticipantManager createParticipant(String participantName) {
@@ -1237,6 +1257,7 @@ public class TestInstanceOperation extends ZkTestBase {
     participant.syncStart();
     _participants.add(participant);
     _participantNames.add(participantName);
+    _nextStartPort++;
   }
 
   private void addParticipant(String participantName) {
@@ -1257,10 +1278,10 @@ public class TestInstanceOperation extends ZkTestBase {
         PARTITIONS, REPLICA, REPLICA - 1);
     _allDBs.add("TEST_DB2_WAGED");
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
-  private void dropTestDBs(Set<String> dbs) {
+  private void dropTestDBs(Set<String> dbs) throws Exception {
     for (String db : dbs) {
       _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db);
       _allDBs.remove(db);
@@ -1394,7 +1415,7 @@ public class TestInstanceOperation extends ZkTestBase {
     }
   }
 
-  private void validateEVsCorrect(Map<String, ExternalView> actuals,
+  private boolean validateEVsCorrect(Map<String, ExternalView> actuals,
       Map<String, ExternalView> originals, Map<String, String> 
swapOutInstancesToSwapInInstances,
       Set<String> inFlightSwapInInstances, Set<String> 
completedSwapInInstanceNames) {
     Assert.assertEquals(actuals.keySet(), originals.keySet());
@@ -1402,6 +1423,7 @@ public class TestInstanceOperation extends ZkTestBase {
       validateEVCorrect(actuals.get(resource), originals.get(resource),
           swapOutInstancesToSwapInInstances, inFlightSwapInInstances, 
completedSwapInInstanceNames);
     }
+    return true;
   }
 
   private void validateAssignmentInEv(ExternalView ev) {
@@ -1461,8 +1483,8 @@ public class TestInstanceOperation extends ZkTestBase {
 
     private void sleepWhileNotCanceled(long sleepTime) throws 
InterruptedException{
       while(sleepTime >0 && !isCancelled()) {
-        Thread.sleep(5000);
-        sleepTime = sleepTime - 5000;
+        Thread.sleep(TIMEOUT);
+        sleepTime = sleepTime - TIMEOUT;
       }
       if (isCancelled()) {
         _cancelled = false;

Reply via email to