This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9fbf2da87bdb266dcac813e3ca50b5ac23d0b45c
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Fri Nov 3 15:12:14 2023 -0700

    HelixAdmin APIs and pipeline changes to support Helix Node Swap  (#2661)
    
    Add ability for up to 2 nodes with the same logicalId to be added to the 
cluster at the same time when a SWAP is happening.
    During all paritionAssignment for WAGED and DelayedAutoRebalancer, we 
select just one instance for each logicalId. Achieves n -> n+1 for all replicas 
on SWAP_OUT node and back n when SWAP is marked complete, making it cancelable.
    
    Adding and updating Helix Admin APIs to support swap operation:
    
    setInstanceOperation
    addInstance
    canCompleteSwap
    completeSwapIfPossible
    * Refactor sanity checks for HelixAdmin swap APIs.
    
    * Helix Node Swap pipeline changes and integration tests.
    
    * Fix integration tests to properly restore stopped MockParticipant so 
following tests are not affected.
    
    * Add comments and docstrings.
    
    * Fix tests to clean up after themselves.
    
    * Optimize duplicate logicalId filtering to only be called on allNodes and 
then used to remove duplicate logicalIds from enabledLiveNodes.
    
    * Add handling for clusterConfig == null in updateSwappingInstances and fix 
AssignableNode to check for clusterTopologyConfig when attempting to get 
logicalId.
    
    * Fix integ tests.
    
    * Fix testGetDomainInformation since we no longer allow an instance to join 
the cluster with an invalid DOMAIN field.
    
    * Add checks to ensure that the SWAP_IN instance has a matching FAULT_ZONE 
and matching INSTANCE_CAPACITY_MAP to SWAP_OUT node.
    
    * Rename canSwapBeCompleted to canCompleteSwap.
    
    * Add sanity checks to allow SWAP_IN node to join the cluster in disabled 
state before SWAP_OUT node has instance operation set.
    
    * Fix print in test case.
    
    * Add canCompleteSwap to PerInstanceAccessor and fix formatting.
    
    * Fix flaky node swap after completion by making sure replica has is 
computed with logicalIds intead of instanceNames.
---
 .../src/main/java/org/apache/helix/HelixAdmin.java |  33 +-
 .../dataproviders/BaseControllerDataProvider.java  |  71 ++
 .../rebalancer/DelayedAutoRebalancer.java          |  75 +-
 .../rebalancer/util/DelayedRebalanceUtil.java      |  88 ++
 .../rebalancer/waged/GlobalRebalanceRunner.java    |  12 +-
 .../rebalancer/waged/WagedRebalancer.java          |  57 +-
 .../constraints/ConstraintBasedAlgorithm.java      |   4 +-
 .../rebalancer/waged/model/AssignableNode.java     |  18 +-
 .../rebalancer/waged/model/ClusterModel.java       |   8 +
 .../waged/model/ClusterModelProvider.java          |   6 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 423 +++++++++-
 .../org/apache/helix/model/InstanceConfig.java     |  28 +
 .../rebalancer/waged/TestWagedRebalancer.java      |   4 +-
 .../rebalancer/TestInstanceOperation.java          | 896 ++++++++++++++++++++-
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |  28 +-
 .../java/org/apache/helix/mock/MockHelixAdmin.java |  10 +
 .../rest/server/resources/AbstractResource.java    |   2 +
 .../resources/helix/PerInstanceAccessor.java       | 103 +--
 18 files changed, 1720 insertions(+), 146 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 085a987b1..f53b886e2 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import org.apache.helix.api.status.ClusterManagementMode;
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.api.topology.ClusterTopology;
@@ -302,8 +304,15 @@ public interface HelixAdmin {
    */
   void enableInstance(String clusterName, List<String> instances, boolean 
enabled);
 
-  void setInstanceOperation(String clusterName, String instance,
-      InstanceConstants.InstanceOperation instanceOperation);
+  /**
+   * Set the instanceOperation field.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation
+   */
+  void setInstanceOperation(String clusterName, String instanceName,
+      @Nullable InstanceConstants.InstanceOperation instanceOperation);
 
   /**
    * Disable or enable a resource
@@ -747,6 +756,26 @@ public interface HelixAdmin {
    */
   boolean isEvacuateFinished(String clusterName, String instancesNames);
 
+  /**
+   * Check to see if swapping between two instances can be completed. Either 
the swapOut or
+   * swapIn instance can be passed in.
+   * @param clusterName  The cluster name
+   * @param instanceName The instance that is being swapped out or swapped in
+   * @return True if the swap is ready to be completed, false otherwise.
+   */
+  boolean canCompleteSwap(String clusterName, String instanceName);
+
+  /**
+   * Check to see if swapping between two instances is ready to be completed 
and complete it if
+   * possible. Either the swapOut or swapIn instance can be passed in.
+   *
+   * @param clusterName  The cluster name
+   * @param instanceName The instance that is being swapped out or swapped in
+   * @return True if the swap is ready to be completed and was completed 
successfully, false
+   * otherwise.
+   */
+  boolean completeSwapIfPossible(String clusterName, String instanceName);
+
   /**
    * Return if instance is ready for preparing joining cluster. The instance 
should have no current state,
    * no pending message and tagged with operation that exclude the instance 
from Helix assignment.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 8e8f9fa9b..9dd517384 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -46,10 +46,12 @@ import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.common.caches.TaskCurrentStateCache;
 import org.apache.helix.common.controllers.ControlContextProvider;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.LogUtil;
 import 
org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private final Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
   private final Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = 
new HashMap<>();
+  private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> 
_abnormalStateResolverMap = new HashMap<>();
   private final Set<String> _timedOutInstanceDuringMaintenance = new 
HashSet<>();
   private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance 
= new HashMap<>();
@@ -437,6 +441,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
     updateIdealRuleMap(getClusterConfig());
     updateDisabledInstances(getInstanceConfigMap().values(), 
getClusterConfig());
+    updateSwappingInstances(getInstanceConfigMap().values(), 
getEnabledLiveInstances(),
+        getClusterConfig());
 
     return refreshedTypes;
   }
@@ -471,6 +477,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     refreshAbnormalStateResolverMap(_clusterConfig);
     updateIdealRuleMap(_clusterConfig);
     updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
+    updateSwappingInstances(getInstanceConfigMap().values(), 
getEnabledLiveInstances(),
+        _clusterConfig);
   }
 
   @Override
@@ -617,6 +625,24 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     return Collections.unmodifiableSet(_disabledInstanceSet);
   }
 
+  /**
+   * Get all swapping instance pairs.
+   *
+   * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN 
instanceNames.
+   */
+  public Map<String, String> getSwapOutToSwapInInstancePairs() {
+    return 
Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
+  }
+
+  /**
+   * Get all the enabled and live SWAP_IN instances.
+   *
+   * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT 
instance.
+   */
+  public Set<String> getEnabledLiveSwapInInstanceNames() {
+    return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
+  }
+
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
     
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
     _updateInstanceOfflineTime = true;
@@ -750,6 +776,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   public void setInstanceConfigMap(Map<String, InstanceConfig> 
instanceConfigMap) {
     _instanceConfigCache.setPropertyMap(instanceConfigMap);
     updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
+    updateSwappingInstances(instanceConfigMap.values(), 
getEnabledLiveInstances(),
+        getClusterConfig());
   }
 
   /**
@@ -858,6 +886,49 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     }
   }
 
+  private void updateSwappingInstances(Collection<InstanceConfig> 
instanceConfigs,
+      Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
+    _swapOutInstanceNameToSwapInInstanceName.clear();
+    _enabledLiveSwapInInstanceNames.clear();
+
+    if (clusterConfig == null) {
+      logger.warn("Skip refreshing swapping instances because clusterConfig is 
null.");
+      return;
+    }
+
+    ClusterTopologyConfig clusterTopologyConfig =
+        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+
+    Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
+    Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+    instanceConfigs.forEach(instanceConfig -> {
+      if (instanceConfig == null) {
+        return;
+      }
+      if (instanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+        swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
+            
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
+      }
+      if (instanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+        swapInInstancesByLogicalId.put(
+            
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
+            instanceConfig.getInstanceName());
+      }
+    });
+
+    swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
+      String swapInInstanceName = swapInInstancesByLogicalId.get(value);
+      if (swapInInstanceName != null) {
+        _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, 
swapInInstanceName);
+        if (liveEnabledInstances.contains(swapInInstanceName)) {
+          _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+        }
+      }
+    });
+  }
+
   /*
    * Check if the instance is timed-out during maintenance mode. An instance 
is timed-out if it has
    * been offline for longer than the user defined timeout window.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 9cd0f71cd..442ddfb02 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -31,17 +31,17 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import java.util.stream.Collectors;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.constants.InstanceConstants;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -56,7 +56,8 @@ import org.slf4j.LoggerFactory;
  */
 public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DelayedAutoRebalancer.class);
-   public static final Set<String> 
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", 
"SWAP_IN");
+  public static ImmutableSet<String> 
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
+      ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());
 
   @Override
   public IdealState computeNewIdealState(String resourceName,
@@ -113,9 +114,16 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
       allNodes = clusterData.getAllInstances();
     }
 
+    Set<String> allNodesDeduped = 
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+        ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
+        clusterData.getInstanceConfigMap(), allNodes);
+    // Remove the non-selected instances with duplicate logicalIds from 
liveEnabledNodes
+    // This ensures the same duplicate instance is kept in both 
allNodesDeduped and liveEnabledNodes
+    liveEnabledNodes.retainAll(allNodesDeduped);
+
     long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, 
clusterConfig);
     Set<String> activeNodes = DelayedRebalanceUtil
-        .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
+        .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
             clusterData.getInstanceOfflineTimeMap(), 
clusterData.getLiveInstances().keySet(),
             clusterData.getInstanceConfigMap(), delay, clusterConfig);
     if (delayRebalanceEnabled) {
@@ -127,11 +135,11 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
           clusterConfig, _manager);
     }
 
-    if (allNodes.isEmpty() || activeNodes.isEmpty()) {
+    if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
       LOG.error(String.format(
           "No instances or active instances available for resource %s, "
               + "allInstances: %s, liveInstances: %s, activeInstances: %s",
-          resourceName, allNodes, liveEnabledNodes, activeNodes));
+          resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
       return generateNewIdealState(resourceName, currentIdealState,
           emptyMapping(currentIdealState));
     }
@@ -157,41 +165,58 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
         getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), 
allPartitions, resourceName,
             stateCountMap, maxPartition);
 
-    // sort node lists to ensure consistent preferred assignments
-    List<String> allNodeList = new ArrayList<>(allNodes);
-    // We will not assign partition to instances with evacuation and wap-out 
tag.
+    List<String> allNodeList = new ArrayList<>(allNodesDeduped);
+
     // TODO: Currently we have 2 groups of instances and compute preference 
list twice and merge.
     // Eventually we want to have exclusive groups of instance for different 
instance tag.
-    List<String> liveEnabledAssignableNodeList = 
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
-        liveEnabledNodes);
+    List<String> liveEnabledAssignableNodeList = new ArrayList<>(
+        // We will not assign partitions to instances with EVACUATE 
InstanceOperation.
+        
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
+            liveEnabledNodes));
+    // sort node lists to ensure consistent preferred assignments
     Collections.sort(allNodeList);
     Collections.sort(liveEnabledAssignableNodeList);
 
-    ZNRecord newIdealMapping = _rebalanceStrategy
-        .computePartitionAssignment(allNodeList, 
liveEnabledAssignableNodeList, currentMapping, clusterData);
+    ZNRecord newIdealMapping =
+        _rebalanceStrategy.computePartitionAssignment(allNodeList, 
liveEnabledAssignableNodeList,
+            currentMapping, clusterData);
     ZNRecord finalMapping = newIdealMapping;
 
     if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, 
clusterConfig)
-        || liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
+        || liveEnabledAssignableNodeList.size() != activeNodes.size()) {
       List<String> activeNodeList = new ArrayList<>(activeNodes);
       Collections.sort(activeNodeList);
       int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
           ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, 
currentIdealState),
           currentIdealState, replicaCount);
 
-      ZNRecord newActiveMapping = _rebalanceStrategy
-          .computePartitionAssignment(allNodeList, activeNodeList, 
currentMapping, clusterData);
+      ZNRecord newActiveMapping =
+          _rebalanceStrategy.computePartitionAssignment(allNodeList, 
activeNodeList, currentMapping,
+              clusterData);
       finalMapping = getFinalDelayedMapping(currentIdealState, 
newIdealMapping, newActiveMapping,
           liveEnabledNodes, replicaCount, minActiveReplicas);
     }
 
     finalMapping.getListFields().putAll(userDefinedPreferenceList);
 
+    // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs 
in the cluster.
+    Map<String, String> swapOutToSwapInInstancePairs =
+        clusterData.getSwapOutToSwapInInstancePairs();
+    // 2. Get all enabled and live SWAP_IN instances in the cluster.
+    Set<String> enabledLiveSwapInInstances = 
clusterData.getEnabledLiveSwapInInstanceNames();
+    // 3. For each SWAP_OUT instance in any of the preferenceLists, add the 
corresponding SWAP_IN instance to the end.
+    // Skipping this when there are not SWAP_IN instances ready(enabled and 
live) will reduce computation time when there is not an active
+    // swap occurring.
+    if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
+      
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
+          swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
+    }
+
     LOG.debug("currentMapping: {}", currentMapping);
     LOG.debug("stateCountMap: {}", stateCountMap);
     LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
     LOG.debug("activeNodes: {}", activeNodes);
-    LOG.debug("allNodes: {}", allNodes);
+    LOG.debug("allNodes: {}", allNodesDeduped);
     LOG.debug("maxPartition: {}", maxPartition);
     LOG.debug("newIdealMapping: {}", newIdealMapping);
     LOG.debug("finalMapping: {}", finalMapping);
@@ -201,14 +226,6 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     return idealState;
   }
 
-  private static List<String> filterOutOnOperationInstances(Map<String, 
InstanceConfig> instanceConfigMap,
-      Set<String> nodes) {
-    return nodes.stream()
-        .filter(
-            instance -> 
!INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
-        .collect(Collectors.toList());
-  }
-
   private IdealState generateNewIdealState(String resourceName, IdealState 
currentIdealState,
       ZNRecord newMapping) {
     IdealState newIdealState = new IdealState(resourceName);
@@ -376,7 +393,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     // if preference list is not empty, and we do have new intanceToAdd, we
     // should check if it has capacity to hold the partition.
     boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache 
!= null;
-    if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) {
+    if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
       // check instanceToAdd instance appears in combinedPreferenceList
       for (String instance : instanceToAdd) {
         if (combinedPreferenceList.contains(instance)) {
@@ -409,7 +426,11 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
         bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
       for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
         String instanceToDrop = 
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
-        bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
+        // We do not want to drop a SWAP_IN node if it is at the end of the 
preferenceList,
+        // because partitions are actively being added on this node to prepare 
for SWAP completion.
+        if (cache == null || 
!cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
+          bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
+        }
       }
     }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 58bad164a..c7066d053 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -34,12 +34,14 @@ import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,6 +141,92 @@ public class DelayedRebalanceUtil {
         .collect(Collectors.toSet());
   }
 
+  /**
+   * Filter out instances with duplicate logical IDs. If there are duplicates, 
the instance with
+   * InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. 
SWAP_IN is not
+   * assignable. If there are duplicates with one node having no 
InstanceOperation and the other
+   * having SWAP_OUT, the node with no InstanceOperation will be chosen. This 
signifies SWAP
+   * completion, therefore making the node assignable.
+   * TODO: Eventually when we refactor DataProvider to have 
getAssignableInstances() and
+   * TODO: getAssignableEnabledLiveInstances() this will not need to be called 
in the rebalancer.
+   * @param clusterTopologyConfig the cluster topology configuration
+   * @param instanceConfigMap     the map of instance name to corresponding 
InstanceConfig
+   * @param instances             the set of instances to filter out duplicate 
logicalIDs for
+   * @return the set of instances with duplicate logicalIDs filtered out, 
there will only be one
+   * instance per logicalID
+   */
+  public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
+      ClusterTopologyConfig clusterTopologyConfig, Map<String, InstanceConfig> 
instanceConfigMap,
+      Set<String> instances) {
+    Set<String> filteredNodes = new HashSet<>();
+    Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
+
+    instances.forEach(node -> {
+      InstanceConfig thisInstanceConfig = instanceConfigMap.get(node);
+      if (thisInstanceConfig == null) {
+        return;
+      }
+      String thisLogicalId =
+          
thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
+
+      if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) {
+        InstanceConfig filteredDuplicateInstanceConfig =
+            
instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId));
+        if ((filteredDuplicateInstanceConfig.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+            && thisInstanceConfig.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+            || thisInstanceConfig.getInstanceOperation().isEmpty()) {
+          // If the already filtered instance is SWAP_IN and this instance is 
in SWAP_OUT, then replace the filtered
+          // instance with this instance. If this instance has no 
InstanceOperation, then replace the filtered instance
+          // with this instance. This is the case where the SWAP_IN node has 
been marked as complete or SWAP_IN exists and
+          // SWAP_OUT does not. There can never be a case where both have no 
InstanceOperation set.
+          
filteredNodes.remove(filteredInstancesByLogicalId.get(thisLogicalId));
+          filteredNodes.add(node);
+          filteredInstancesByLogicalId.put(thisLogicalId, node);
+        }
+      } else {
+        filteredNodes.add(node);
+        filteredInstancesByLogicalId.put(thisLogicalId, node);
+      }
+    });
+
+    return filteredNodes;
+  }
+
+  /**
+   * Look through the provided mapping and add corresponding SWAP_IN node if a 
SWAP_OUT node exists
+   * in the partition's preference list.
+   *
+   * @param mapping                      the mapping to be updated (IdealState 
ZNRecord)
+   * @param swapOutToSwapInInstancePairs the map of SWAP_OUT to SWAP_IN 
instances
+   */
+  public static void 
addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(ZNRecord mapping,
+      Map<String, String> swapOutToSwapInInstancePairs, Set<String> 
enabledLiveSwapInInstances) {
+    Map<String, List<String>> preferenceListsByPartition = 
mapping.getListFields();
+    for (String partition : preferenceListsByPartition.keySet()) {
+      List<String> preferenceList = preferenceListsByPartition.get(partition);
+      if (preferenceList == null) {
+        continue;
+      }
+      List<String> newInstancesToAdd = new ArrayList<>();
+      for (String instanceName : preferenceList) {
+        if (swapOutToSwapInInstancePairs.containsKey(instanceName)
+            && enabledLiveSwapInInstances.contains(
+            swapOutToSwapInInstancePairs.get(instanceName))) {
+          String swapInInstanceName = 
swapOutToSwapInInstancePairs.get(instanceName);
+          if (!preferenceList.contains(swapInInstanceName) && 
!newInstancesToAdd.contains(
+              swapInInstanceName)) {
+            newInstancesToAdd.add(swapInInstanceName);
+          }
+        }
+      }
+      if (!newInstancesToAdd.isEmpty()) {
+        preferenceList.addAll(newInstancesToAdd);
+      }
+    }
+  }
+
   /**
    * Return the time when an offline or disabled instance should be treated as 
inactive. Return -1
    * if it is inactive now or forced to be rebalanced by an on-demand 
rebalance.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
index 6130e5c52..6c199bc1b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -30,10 +30,12 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.metrics.MetricCollector;
@@ -163,8 +165,14 @@ class GlobalRebalanceRunner implements AutoCloseable {
         _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, 
currentStateOutput, resourceMap.keySet());
     ClusterModel clusterModel;
     try {
-      clusterModel =
-          ClusterModelProvider.generateClusterModelForBaseline(clusterData, 
resourceMap, clusterData.getAllInstances(),
+      clusterModel = 
ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
+          // Dedupe and select correct node if there is more than one with the 
same logical id.
+          // We should be calculating a new baseline only after a swap 
operation is complete and the SWAP_IN node is selected
+          // by deduping. This ensures that adding the SWAP_IN node to the 
cluster does not cause new baseline to be calculated
+          // with both the SWAP_OUT and SWAP_IN node.
+          DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+              
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+              clusterData.getInstanceConfigMap(), 
clusterData.getAllInstances()),
               clusterChanges, currentBaseline);
     } catch (Exception ex) {
       throw new HelixRebalanceException("Failed to generate cluster model for 
global rebalance.",
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index ad3c7b20d..8f71f4e6d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,8 +47,8 @@ import 
org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -302,8 +302,17 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       final CurrentStateOutput currentStateOutput, RebalanceAlgorithm 
algorithm)
       throws HelixRebalanceException {
-    Set<String> activeNodes = DelayedRebalanceUtil
-        .getActiveNodes(clusterData.getAllInstances(), 
clusterData.getEnabledLiveInstances(),
+
+    Set<String> allNodesDeduped = 
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+        
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+        clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+    // Remove the non-selected instances with duplicate logicalIds from 
liveEnabledNodes
+    // This ensures the same duplicate instance is kept in both 
allNodesDeduped and liveEnabledNodes
+    Set<String> liveEnabledNodesDeduped = 
clusterData.getEnabledLiveInstances();
+    liveEnabledNodesDeduped.retainAll(allNodesDeduped);
+
+    Set<String> activeNodes =
+        DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, 
liveEnabledNodesDeduped,
             clusterData.getInstanceOfflineTimeMap(), 
clusterData.getLiveInstances().keySet(),
             clusterData.getInstanceConfigMap(), 
clusterData.getClusterConfig());
 
@@ -359,6 +368,20 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
         // Sort the preference list according to state priority.
         newIdealState.setPreferenceLists(
             getPreferenceLists(assignments.get(resourceName), 
statePriorityMap));
+
+        // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance 
pairs in the cluster.
+        Map<String, String> swapOutToSwapInInstancePairs =
+            clusterData.getSwapOutToSwapInInstancePairs();
+        // 2. Get all enabled and live SWAP_IN instances in the cluster.
+        Set<String> enabledLiveSwapInInstances = 
clusterData.getEnabledLiveSwapInInstanceNames();
+        // 3. For each SWAP_OUT instance in any of the preferenceLists, add 
the corresponding SWAP_IN instance to the end.
+        // Skipping this when there are not SWAP_IN instances ready(enabled 
and live) will reduce computation time when there is not an active
+        // swap occurring.
+        if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
+          
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(
+              newIdealState.getRecord(), swapOutToSwapInInstancePairs, 
enabledLiveSwapInInstances);
+        }
+
         // Note the state mapping in the new assignment won't directly 
propagate to the map fields.
         // The rebalancer will calculate for the final state mapping 
considering the current states.
         finalIdealStateMap.put(resourceName, newIdealState);
@@ -398,7 +421,14 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
       RebalanceAlgorithm algorithm) throws HelixRebalanceException {
     // the "real" live nodes at the time
     // TODO: this is a hacky way to filter our on operation instance. We 
should consider redesign `getEnabledLiveInstances()`.
-    final Set<String> enabledLiveInstances = 
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), 
clusterData.getEnabledLiveInstances());
+    final Set<String> allNodesDeduped = 
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+        
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+        clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+    final Set<String> enabledLiveInstances = 
clusterData.getEnabledLiveInstances();
+    // Remove the non-selected instances with duplicate logicalIds from 
liveEnabledNodes
+    // This ensures the same duplicate instance is kept in both 
allNodesDeduped and liveEnabledNodes
+    enabledLiveInstances.retainAll(allNodesDeduped);
+
     if (activeNodes.equals(enabledLiveInstances) || 
!requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
       // no need for additional process, return the current resource assignment
       return currentResourceAssignment;
@@ -427,14 +457,6 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private static Set<String> filterOutOnOperationInstances(Map<String, 
InstanceConfig> instanceConfigMap,
-      Set<String> nodes) {
-    return nodes.stream()
-        .filter(
-            instance -> 
!DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
-        .collect(Collectors.toSet());
-  }
-
   /**
    * Emergency rebalance is scheduled to quickly handle urgent cases like 
reassigning partitions from inactive nodes
    * and addressing for partitions failing to meet minActiveReplicas.
@@ -619,8 +641,15 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> 
{
       String resourceName = resourceAssignment.getResourceName();
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
-      Set<String> enabledLiveInstances =
-          filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), 
clusterData.getEnabledLiveInstances());
+
+      Set<String> allNodesDeduped = 
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+          
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+          clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+      // Remove the non-selected instances with duplicate logicalIds from 
liveEnabledNodes
+      // This ensures the same duplicate instance is kept in both 
allNodesDeduped and liveEnabledNodes
+      enabledLiveInstances.retainAll(allNodesDeduped);
+
       int numReplica = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = 
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
           
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 5dbeb5c38..77d56302c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -153,7 +153,7 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
             int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
             int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
             return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
-                : -instanceName1.compareTo(instanceName2);
+                : -nodeEntry1.getKey().compareTo(nodeEntry2.getKey());
           } else {
             return scoreCompareResult;
           }
@@ -193,7 +193,7 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
           .containsKey(replica.getResourceName());
       _isInBaselineAssignment =
           
clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
-      _replicaHash = Objects.hash(replica.toString(), 
clusterModel.getAssignableNodes().keySet());
+      _replicaHash = Objects.hash(replica.toString(), 
clusterModel.getAssignableLogicalIds());
       computeScore(overallClusterRemainingCapacityMap);
     }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index e29052a0d..b4e8c3c71 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
 
   // Immutable Instance Properties
   private final String _instanceName;
+  private final String _logicaId;
   private final String _faultZone;
   // maximum number of the partitions that can be assigned to the instance.
   private final int _maxPartition;
@@ -72,8 +74,12 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * ResourceConfig could
    * subject to change. If the assumption is no longer true, this function 
should become private.
    */
-  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, 
String instanceName) {
+  AssignableNode(ClusterConfig clusterConfig, ClusterTopologyConfig 
clusterTopologyConfig,
+      InstanceConfig instanceConfig, String instanceName) {
     _instanceName = instanceName;
+    _logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId(
+        clusterTopologyConfig.getEndNodeType())
+            : instanceName;
     Map<String, Integer> instanceCapacity = 
fetchInstanceCapacity(clusterConfig, instanceConfig);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
     _instanceTags = ImmutableSet.copyOf(instanceConfig.getTags());
@@ -86,6 +92,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     _currentAssignedReplicaMap = new HashMap<>();
   }
 
+  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, 
String instanceName) {
+    this(clusterConfig, null, instanceConfig, instanceName);
+  }
+
   /**
    * This function should only be used to assign a set of new partitions that 
are not allocated on
    * this node. It's because the any exception could occur at the middle of 
batch assignment and the
@@ -272,6 +282,10 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
     return _instanceName;
   }
 
+  public String getLogicalId() {
+    return _logicaId;
+  }
+
   public Set<String> getInstanceTags() {
     return _instanceTags;
   }
@@ -368,7 +382,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
 
   @Override
   public int compareTo(AssignableNode o) {
-    return _instanceName.compareTo(o.getInstanceName());
+    return _logicaId.compareTo(o.getLogicalId());
   }
 
   @Override
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 42eaabaf9..7ef503e01 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -37,6 +37,7 @@ public class ClusterModel {
   // Note that the identical replicas are deduped in the index.
   private final Map<String, Map<String, AssignableReplica>> 
_assignableReplicaIndex;
   private final Map<String, AssignableNode> _assignableNodeMap;
+  private final Set<String> _assignableNodeLogicalIds;
 
   /**
    * @param clusterContext         The initialized cluster context.
@@ -60,6 +61,9 @@ public class ClusterModel {
 
     _assignableNodeMap = assignableNodes.parallelStream()
         .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> 
node));
+    _assignableNodeLogicalIds =
+        assignableNodes.parallelStream().map(AssignableNode::getLogicalId)
+            .collect(Collectors.toSet());
   }
 
   public ClusterContext getContext() {
@@ -70,6 +74,10 @@ public class ClusterModel {
     return _assignableNodeMap;
   }
 
+  public Set<String> getAssignableLogicalIds() {
+    return _assignableNodeLogicalIds;
+  }
+
   public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
     return _assignableReplicaMap;
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index dffaec3e0..3f1673210 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
@@ -530,9 +531,12 @@ public class ClusterModelProvider {
    */
   private static Set<AssignableNode> getAllAssignableNodes(ClusterConfig 
clusterConfig,
       Map<String, InstanceConfig> instanceConfigMap, Set<String> 
activeInstances) {
+    ClusterTopologyConfig clusterTopologyConfig =
+        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
     return activeInstances.parallelStream()
         .filter(instanceConfigMap::containsKey).map(
-            instanceName -> new AssignableNode(clusterConfig, 
instanceConfigMap.get(instanceName),
+            instanceName -> new AssignableNode(clusterConfig, 
clusterTopologyConfig,
+                instanceConfigMap.get(instanceName),
                 instanceName)).collect(Collectors.toSet());
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ebbcf64d0..34bd56487 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,10 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
@@ -47,7 +51,6 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
@@ -67,6 +70,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
@@ -86,6 +90,7 @@ import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.ConfigStringUtil;
@@ -114,6 +119,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
   private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
+  private static final ImmutableSet<String> 
ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE =
+      ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name());
 
   private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
@@ -197,6 +204,108 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("Node " + nodeId + " already exists in cluster 
" + clusterName);
     }
 
+    if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
+        instanceConfig.getInstanceOperation())) {
+      throw new HelixException(
+          "Instance can only be added if InstanceOperation is set to one of" + 
"the following: "
+              + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This 
instance: " + nodeId
+              + " has InstanceOperation set to " + 
instanceConfig.getInstanceOperation());
+    }
+
+    // Get the topology key used to determine the logicalId of a node.
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
+    ClusterTopologyConfig clusterTopologyConfig =
+        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+    String logicalIdKey = clusterTopologyConfig.getEndNodeType();
+    String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
+    String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
+
+    HelixConfigScope instanceConfigScope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+            clusterName).build();
+    List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
+    List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
+        existingInstanceIds.parallelStream()
+            .map(existingInstanceId -> getInstanceConfig(clusterName, 
existingInstanceId)).filter(
+                existingInstanceConfig -> 
existingInstanceConfig.getLogicalId(logicalIdKey)
+                    
.equals(toAddInstanceLogicalId)).collect(Collectors.toList());
+
+    if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
+      // If the length is 2, we cannot add an instance with the same logicalId 
as an existing instance
+      // regardless of InstanceOperation.
+      throw new HelixException(
+          "There can only be 2 instances with the same logicalId in a cluster. 
"
+              + "Existing instances: " + 
foundInstanceConfigsWithMatchingLogicalId.get(0)
+              .getInstanceName() + " and " + 
foundInstanceConfigsWithMatchingLogicalId.get(1)
+              .getInstanceName() + " already have the same logicalId: " + 
toAddInstanceLogicalId
+              + "; therefore, " + nodeId + " cannot be added to the cluster.");
+    } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
+      // If there is only one instance with the same logicalId, we can infer 
that the intended behaviour
+      // is to SWAP_IN.
+
+      // If the InstanceOperation is unset, we will set it to SWAP_IN.
+      if (!instanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+        
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
+      }
+
+      // If the existing instance with the same logicalId does not have 
InstanceOperation set to SWAP_OUT and this instance
+      // is attempting to join as enabled, we cannot add this instance.
+      if (instanceConfig.getInstanceEnabled() && 
!foundInstanceConfigsWithMatchingLogicalId.get(0)
+          .getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+        throw new HelixException(
+            "Instance can only be added if the exising instance sharing the 
same logicalId has InstanceOperation"
+                + " set to " + 
InstanceConstants.InstanceOperation.SWAP_OUT.name()
+                + " and this instance has InstanceOperation set to "
+                + InstanceConstants.InstanceOperation.SWAP_IN.name() + ". " + 
"Existing instance: "
+                + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+                + " has InstanceOperation: "
+                + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
+                + " and this instance: " + nodeId + " has InstanceOperation: "
+                + instanceConfig.getInstanceOperation());
+      }
+
+      // If the existing instance with the same logicalId is not in the same 
FAULT_ZONE as this instance, we cannot
+      // add this instance.
+      if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
+          .containsKey(faultZoneKey) || 
!instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
+          || 
!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
+          .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
+        throw new HelixException(
+            "Instance can only be added if the SWAP_OUT instance sharing the 
same logicalId is in the same FAULT_ZONE"
+                + " as this instance. " + "Existing instance: "
+                + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+                + " has FAULT_ZONE_TYPE: " + 
foundInstanceConfigsWithMatchingLogicalId.get(0)
+                .getDomainAsMap().get(faultZoneKey) + " and this instance: " + 
nodeId
+                + " has FAULT_ZONE_TYPE: " + 
instanceConfig.getDomainAsMap().get(faultZoneKey));
+      }
+
+      Map<String, Integer> foundInstanceCapacityMap =
+          
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
+              ? clusterConfig.getDefaultInstanceCapacityMap()
+              : 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
+      Map<String, Integer> instanceCapacityMap = 
instanceConfig.getInstanceCapacityMap().isEmpty()
+          ? clusterConfig.getDefaultInstanceCapacityMap() : 
instanceConfig.getInstanceCapacityMap();
+      // If the instance does not have the same capacity, we cannot add this 
instance.
+      if (!new EqualsBuilder().append(foundInstanceCapacityMap, 
instanceCapacityMap).isEquals()) {
+        throw new HelixException(
+            "Instance can only be added if the SWAP_OUT instance sharing the 
same logicalId has the same capacity"
+                + " as this instance. " + "Existing instance: "
+                + 
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+                + " has capacity: " + foundInstanceCapacityMap + " and this 
instance: " + nodeId
+                + " has capacity: " + instanceCapacityMap);
+      }
+    } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
+      // If there are no instances with the same logicalId, we can only add 
this instance if InstanceOperation
+      // is unset because it is a new instance.
+      throw new HelixException(
+          "There is no instance with logicalId: " + toAddInstanceLogicalId + " 
in cluster: "
+              + clusterName + "; therefore, " + nodeId
+              + " cannot join cluster with InstanceOperation set to "
+              + instanceConfig.getInstanceOperation() + ".");
+    }
+
     ZKUtil.createChildren(_zkClient, instanceConfigsPath, 
instanceConfig.getRecord());
 
     
_zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, 
nodeId), true);
@@ -358,6 +467,21 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("{} instance {} in cluster {}.", enabled ? "Enable" : 
"Disable", instanceName,
         clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+
+    // If enabled is set to true and InstanceOperation is SWAP_IN, we should 
fail if there is not a
+    // matching SWAP_OUT instance.
+    InstanceConfig instanceConfig = getInstanceConfig(clusterName, 
instanceName);
+    if (enabled && instanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+      InstanceConfig matchingSwapInstance = 
findMatchingSwapInstance(clusterName, instanceConfig);
+      if (matchingSwapInstance == null || 
!matchingSwapInstance.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+        throw new HelixException("Instance cannot be enabled if 
InstanceOperation is set to "
+            + instanceConfig.getInstanceOperation() + " when there is no 
matching "
+            + InstanceConstants.InstanceOperation.SWAP_OUT.name() + " 
instance.");
+      }
+    }
+
     // Eventually we will have all instances' enable/disable information in 
clusterConfig. Now we
     // update both instanceConfig and clusterConfig in transition period.
     enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, 
disabledType, reason);
@@ -379,11 +503,53 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   // TODO: Name may change in future
   public void setInstanceOperation(String clusterName, String instanceName,
-      InstanceConstants.InstanceOperation instanceOperation) {
+      @Nullable InstanceConstants.InstanceOperation instanceOperation) {
 
     BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
     String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
 
+    // InstanceOperation can only be set to SWAP_IN when the instance is added 
to the cluster
+    // or if it is disabled.
+    if (instanceOperation != null && instanceOperation.equals(
+        InstanceConstants.InstanceOperation.SWAP_IN) && 
getInstanceConfig(clusterName,
+        instanceName).getInstanceEnabled()) {
+      throw new HelixException("InstanceOperation should only be set to "
+          + InstanceConstants.InstanceOperation.SWAP_IN.name()
+          + " when an instance joins the cluster for the first time(when "
+          + "creating the InstanceConfig) or is disabled.");
+    }
+
+    // InstanceOperation cannot be set to null if there is an instance with 
the same logicalId in
+    // the cluster which does not have InstanceOperation set to SWAP_IN or 
SWAP_OUT.
+    if (instanceOperation == null) {
+      InstanceConfig instanceConfig = getInstanceConfig(clusterName, 
instanceName);
+      String logicalIdKey = ClusterTopologyConfig.createFromClusterConfig(
+          _configAccessor.getClusterConfig(clusterName)).getEndNodeType();
+
+      HelixConfigScope instanceConfigScope =
+          new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+              clusterName).build();
+      List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
+      List<InstanceConfig> matchingInstancesWithNonSwappingInstanceOperation =
+          existingInstanceIds.parallelStream()
+              .map(existingInstanceId -> getInstanceConfig(clusterName, 
existingInstanceId)).filter(
+                  existingInstanceConfig ->
+                      
!existingInstanceConfig.getInstanceName().equals(instanceName)
+                          && existingInstanceConfig.getLogicalId(logicalIdKey)
+                          .equals(instanceConfig.getLogicalId(logicalIdKey))
+                          && !existingInstanceConfig.getInstanceOperation()
+                          
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+                          && !existingInstanceConfig.getInstanceOperation()
+                          
.equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+              .collect(Collectors.toList());
+
+      if (!matchingInstancesWithNonSwappingInstanceOperation.isEmpty()) {
+        throw new HelixException("InstanceOperation cannot be set to null for 
" + instanceName
+            + " if there are other instances with the same logicalId in the 
cluster that do not have"
+            + " InstanceOperation set to SWAP_IN or SWAP_OUT.");
+      }
+    }
+
     if (!baseAccessor.exists(path, 0)) {
       throw new HelixException(
           "Cluster " + clusterName + ", instance: " + instanceName + ", 
instance config does not exist");
@@ -410,16 +576,263 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public boolean isEvacuateFinished(String clusterName, String instanceName) {
-    if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) {
+    if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
       InstanceConfig config = getInstanceConfig(clusterName, instanceName);
       return config != null && 
config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name());
     }
     return false;
   }
 
+  /**
+   * Find the instance that the passed instance is swapping with. If the 
passed instance has
+   * SWAP_OUT instanceOperation, then find the corresponding instance that has 
SWAP_IN
+   * instanceOperation. If the passed instance has SWAP_IN instanceOperation, 
then find the
+   * corresponding instance that has SWAP_OUT instanceOperation.
+   *
+   * @param clusterName    The cluster name
+   * @param instanceConfig The instance to find the swap instance for
+   * @return The swap instance if found, null otherwise.
+   */
+  @Nullable
+  private InstanceConfig findMatchingSwapInstance(String clusterName,
+      InstanceConfig instanceConfig) {
+    String logicalIdKey =
+        
ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
+            .getEndNodeType();
+
+    for (String potentialSwappingInstance : getConfigKeys(
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+            clusterName).build())) {
+      InstanceConfig potentialSwappingInstanceConfig =
+          getInstanceConfig(clusterName, potentialSwappingInstance);
+
+      // Return if there is a matching Instance with the same logicalId and 
opposite InstanceOperation swap operation.
+      if (potentialSwappingInstanceConfig.getLogicalId(logicalIdKey)
+          .equals(instanceConfig.getLogicalId(logicalIdKey)) && (
+          instanceConfig.getInstanceOperation()
+              .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+              && potentialSwappingInstanceConfig.getInstanceOperation()
+              .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) || 
(
+          instanceConfig.getInstanceOperation()
+              .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
+              && potentialSwappingInstanceConfig.getInstanceOperation()
+              .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()))) {
+        return potentialSwappingInstanceConfig;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Check to see if swapping between two instances is ready to be completed. 
Checks: 1. Both
+   * instances must be alive. 2. Both instances must only have one session and 
not be carrying over
+   * from a previous session. 3. Both instances must have no pending messages. 
4. Both instances
+   * cannot have partitions in the ERROR state 5. SwapIn instance must have 
correct state for all
+   * partitions that are currently assigned to the SwapOut instance.
+   * TODO: We may want to make this a public API in the future.
+   *
+   * @param clusterName         The cluster name
+   * @param swapOutInstanceName The instance that is being swapped out
+   * @param swapInInstanceName  The instance that is being swapped in
+   * @return True if the swap is ready to be completed, false otherwise.
+   */
+  private boolean canCompleteSwap(String clusterName, String 
swapOutInstanceName,
+      String swapInInstanceName) {
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // 1. Check that both instances are alive.
+    LiveInstance swapOutLiveInstance =
+        accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
+    LiveInstance swapInLiveInstance =
+        accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
+    if (swapOutLiveInstance == null || swapInLiveInstance == null) {
+      logger.warn(
+          "SwapOutInstance {} is {} and SwapInInstance {} is {} for cluster 
{}. Swap will not complete unless both instances are ONLINE.",
+          swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : 
"OFFLINE",
+          swapInInstanceName, swapInLiveInstance != null ? "ONLINE" : 
"OFFLINE", clusterName);
+      return false;
+    }
+
+    // 2. Check that both instances only have one session and are not carrying 
any over.
+    // count number of sessions under CurrentState folder. If it is carrying 
over from prv session,
+    // then there are > 1 session ZNodes.
+    List<String> swapOutSessions = baseAccessor.getChildNames(
+        PropertyPathBuilder.instanceCurrentState(clusterName, 
swapOutInstanceName), 0);
+    List<String> swapInSessions = baseAccessor.getChildNames(
+        PropertyPathBuilder.instanceCurrentState(clusterName, 
swapInInstanceName), 0);
+    if (swapOutSessions.size() > 1 || swapInSessions.size() > 1) {
+      logger.warn(
+          "SwapOutInstance {} is carrying over from prev session and 
SwapInInstance {} is carrying over from prev session for cluster {}."
+              + " Swap will not complete unless both instances have only one 
session.",
+          swapOutInstanceName, swapInInstanceName, clusterName);
+      return false;
+    }
+
+    // 3. Check that the swapOutInstance has no pending messages.
+    List<Message> swapOutMessages =
+        accessor.getChildValues(keyBuilder.messages(swapOutInstanceName), 
true);
+    int swapOutPendingMessageCount = swapOutMessages != null ? 
swapOutMessages.size() : 0;
+    List<Message> swapInMessages =
+        accessor.getChildValues(keyBuilder.messages(swapInInstanceName), true);
+    int swapInPendingMessageCount = swapInMessages != null ? 
swapInMessages.size() : 0;
+    if (swapOutPendingMessageCount > 0 || swapInPendingMessageCount > 0) {
+      logger.warn(
+          "SwapOutInstance {} has {} pending messages and SwapInInstance {} 
has {} pending messages for cluster {}."
+              + " Swap will not complete unless both instances have no pending 
messages.",
+          swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName,
+          swapInPendingMessageCount, clusterName);
+      return false;
+    }
+
+    // 4. Collect a list of all partitions that have a current state on 
swapOutInstance
+    String swapOutActiveSession = swapOutLiveInstance.getEphemeralOwner();
+    String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();
+
+    // Iterate over all resources with current states on the swapOutInstance
+    List<String> swapOutResources = baseAccessor.getChildNames(
+        PropertyPathBuilder.instanceCurrentState(clusterName, 
swapOutInstanceName,
+            swapOutActiveSession), 0);
+    for (String swapOutResource : swapOutResources) {
+      // Get the topState and secondTopStates for the stateModelDef used by 
the resource.
+      IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(swapOutResource));
+      StateModelDefinition stateModelDefinition =
+          
accessor.getProperty(keyBuilder.stateModelDef(idealState.getStateModelDefRef()));
+      String topState = stateModelDefinition.getTopState();
+      Set<String> secondTopStates = stateModelDefinition.getSecondTopStates();
+
+      CurrentState swapOutResourceCurrentState = accessor.getProperty(
+          keyBuilder.currentState(swapOutInstanceName, swapOutActiveSession, 
swapOutResource));
+      CurrentState swapInResourceCurrentState = accessor.getProperty(
+          keyBuilder.currentState(swapInInstanceName, swapInActiveSession, 
swapOutResource));
+
+      // Check to make sure swapInInstance has a current state for the resource
+      if (swapInResourceCurrentState == null) {
+        logger.warn(
+            "SwapOutInstance {} has current state for resource {} but 
SwapInInstance {} does not for cluster {}."
+                + " Swap will not complete unless both instances have current 
states for all resources.",
+            swapOutInstanceName, swapOutResource, swapInInstanceName, 
clusterName);
+        return false;
+      }
+
+      // Iterate over all partitions in the swapOutInstance's current state 
for the resource
+      // and ensure that the swapInInstance has the correct state for the 
partition.
+      for (String partitionName : 
swapOutResourceCurrentState.getPartitionStateMap().keySet()) {
+        String swapOutPartitionState = 
swapOutResourceCurrentState.getState(partitionName);
+        String swapInPartitionState = 
swapInResourceCurrentState.getState(partitionName);
+
+        // Neither instance should have any partitions in ERROR state.
+        if (swapOutPartitionState.equals(HelixDefinedState.ERROR.name())
+            || swapInPartitionState.equals(HelixDefinedState.ERROR.name())) {
+          logger.warn(
+              "SwapOutInstance {} has partition {} in state {} and 
SwapInInstance {} has partition {} in state {} for cluster {}."
+                  + " Swap will not complete unless both instances have no 
partitions in ERROR state.",
+              swapOutInstanceName, partitionName, swapOutPartitionState, 
swapInInstanceName,
+              partitionName, swapInPartitionState, clusterName);
+          return false;
+        }
+
+        // When the state of a partition on a swapOut instance is in the 
topState, the state
+        // of the partition on the swapInInstance should also be in the 
topState or a secondTopState.
+        // It should be in a topState only if the state model allows multiple 
replicas in the topState.
+        // In all other cases it should be a secondTopState.
+        if (swapOutPartitionState.equals(topState) && 
!(swapInPartitionState.equals(topState)
+            || secondTopStates.contains(swapInPartitionState))) {
+          logger.warn(
+              "SwapOutInstance {} has partition {} in topState {} but 
SwapInInstance {} has partition {} in state {} for cluster {}."
+                  + " Swap will not complete unless SwapInInstance has 
partition in topState or secondState.",
+              swapOutInstanceName, partitionName, swapOutPartitionState, 
swapInInstanceName,
+              partitionName, swapInPartitionState, clusterName);
+          return false;
+        }
+
+        // When the state of a partition on a swapOut instance is any other 
state, except ERROR, DROPPED or TopState,
+        // the state of the partition on the swapInInstance should be the same.
+        if (!swapOutPartitionState.equals(topState) && 
!swapOutPartitionState.equals(
+            HelixDefinedState.DROPPED.name())
+            && !swapOutPartitionState.equals(swapInPartitionState)) {
+          logger.warn(
+              "SwapOutInstance {} has partition {} in state {} but 
SwapInInstance {} has partition {} in state {} for cluster {}."
+                  + " Swap will not complete unless both instances have 
matching states.",
+              swapOutInstanceName, partitionName, swapOutPartitionState, 
swapInInstanceName,
+              partitionName, swapInPartitionState, clusterName);
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean canCompleteSwap(String clusterName, String instanceName) {
+    InstanceConfig instanceConfig = getInstanceConfig(clusterName, 
instanceName);
+    if (instanceConfig == null) {
+      logger.warn(
+          "Instance {} in cluster {} does not exist. Cannot determine if the 
swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    InstanceConfig swapOutInstanceConfig = 
instanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? 
instanceConfig
+        : findMatchingSwapInstance(clusterName, instanceConfig);
+    InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? 
instanceConfig
+        : findMatchingSwapInstance(clusterName, instanceConfig);
+    if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
+      logger.warn(
+          "Instance {} in cluster {} is not swapping with any other instance. 
Cannot determine if the swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    // Check if the swap is ready to be completed.
+    return canCompleteSwap(clusterName, 
swapOutInstanceConfig.getInstanceName(),
+        swapInInstanceConfig.getInstanceName());
+  }
+
+  @Override
+  public boolean completeSwapIfPossible(String clusterName, String 
instanceName) {
+    InstanceConfig instanceConfig = getInstanceConfig(clusterName, 
instanceName);
+    if (instanceConfig == null) {
+      logger.warn(
+          "Instance {} in cluster {} does not exist. Cannot determine if the 
swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    InstanceConfig swapOutInstanceConfig = 
instanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? 
instanceConfig
+        : findMatchingSwapInstance(clusterName, instanceConfig);
+    InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? 
instanceConfig
+        : findMatchingSwapInstance(clusterName, instanceConfig);
+    if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
+      logger.warn(
+          "Instance {} in cluster {} is not swapping with any other instance. 
Cannot determine if the swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    // Check if the swap is ready to be completed. If not, return false.
+    if (!canCompleteSwap(clusterName, swapOutInstanceConfig.getInstanceName(),
+        swapInInstanceConfig.getInstanceName())) {
+      return false;
+    }
+
+    // Complete the swap by removing the InstanceOperation for the SWAP_IN 
node and disabling the SWAP_OUT node.
+    setInstanceOperation(clusterName, swapInInstanceConfig.getInstanceName(), 
null);
+    enableInstance(clusterName, swapOutInstanceConfig.getInstanceName(), 
false);
+
+    return true;
+  }
+
   @Override
   public boolean isReadyForPreparingJoiningCluster(String clusterName, String 
instanceName) {
-    if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) {
+    if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
       InstanceConfig config = getInstanceConfig(clusterName, instanceName);
       return config != null && 
DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
           config.getInstanceOperation());
@@ -434,7 +847,7 @@ public class ZKHelixAdmin implements HelixAdmin {
    * @param instanceName
    * @return
    */
-  private boolean instanceHasCurrentSateOrMessage(String clusterName, String 
instanceName) {
+  private boolean instanceHasCurrentStateOrMessage(String clusterName, String 
instanceName) {
     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index b68250dd3..98da7340f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -702,6 +702,18 @@ public class InstanceConfig extends HelixProperty {
     return _record.getId();
   }
 
+  /**
+   * Get the logicalId of this instance. If it does not exist or is not set,
+   * return the instance name.
+   * @param logicalIdKey the key for the DOMAIN field containing the logicalId
+   * @return the logicalId of this instance
+   */
+  public String getLogicalId(String logicalIdKey) {
+    // TODO: Consider caching DomainMap, parsing the DOMAIN string every time
+    // getLogicalId is called can become expensive if called too frequently.
+    return getDomainAsMap().getOrDefault(logicalIdKey, getInstanceName());
+  }
+
   @Override
   public boolean isValid() {
     // HELIX-65: remove check for hostname/port existence
@@ -772,6 +784,7 @@ public class InstanceConfig extends HelixProperty {
     private int _weight = WEIGHT_NOT_SET;
     private List<String> _tags = new ArrayList<>();
     private boolean _instanceEnabled = HELIX_ENABLED_DEFAULT_VALUE;
+    private InstanceConstants.InstanceOperation _instanceOperation;
     private Map<String, String> _instanceInfoMap;
     private Map<String, Integer> _instanceCapacityMap;
 
@@ -819,6 +832,10 @@ public class InstanceConfig extends HelixProperty {
         instanceConfig.setInstanceEnabled(_instanceEnabled);
       }
 
+      if (_instanceOperation != null) {
+        instanceConfig.setInstanceOperation(_instanceOperation);
+      }
+
       if (_instanceInfoMap != null) {
         instanceConfig.setInstanceInfoMap(_instanceInfoMap);
       }
@@ -890,6 +907,17 @@ public class InstanceConfig extends HelixProperty {
       return this;
     }
 
+    /**
+     * Set the instance operation for this instance
+     *
+     * @param instanceOperation the instance operation.
+     * @return InstanceConfig.Builder
+     */
+    public Builder setInstanceOperation(InstanceConstants.InstanceOperation 
instanceOperation) {
+      _instanceOperation = instanceOperation;
+      return this;
+    }
+
     /**
      * Set the INSTANCE_INFO_MAP for this instance
      * @param instanceInfoMap the instance info map
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 608a4d3af..000978ef1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.rebalancer.waged;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -738,7 +739,8 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     instances.add(offlineInstance);
     when(clusterData.getAllInstances()).thenReturn(instances);
     when(clusterData.getEnabledInstances()).thenReturn(instances);
-    
when(clusterData.getEnabledLiveInstances()).thenReturn(ImmutableSet.of(instance0,
 instance1, instance2));
+    when(clusterData.getEnabledLiveInstances()).thenReturn(
+        new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
     Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
     instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + 
Integer.MAX_VALUE);
     
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 10cd662cb..9ccc14fdf 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -10,12 +10,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
@@ -30,8 +32,10 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -51,15 +55,28 @@ public class TestInstanceOperation extends ZkTestBase {
 
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private final String TEST_CAPACITY_KEY = "TestCapacityKey";
+  private final int TEST_CAPACITY_VALUE = 100;
+  protected static final String ZONE = "zone";
+  protected static final String HOST = "host";
+  protected static final String LOGICAL_ID = "logicalId";
+  protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, 
HOST, LOGICAL_ID);
+
+  protected static final ImmutableSet<String> SECONDARY_STATE_SET =
+      ImmutableSet.of("SLAVE", "STANDBY");
+  protected static final ImmutableSet<String> ACCEPTABLE_STATE_SET =
+      ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY");
   private int REPLICA = 3;
   protected ClusterControllerManager _controller;
   List<MockParticipantManager> _participants = new ArrayList<>();
+  private List<String> _originalParticipantNames = new ArrayList<>();
   List<String> _participantNames = new ArrayList<>();
   private Set<String> _allDBs = new HashSet<>();
   private ZkHelixClusterVerifier _clusterVerifier;
   private ConfigAccessor _configAccessor;
   private long _stateModelDelay = 3L;
 
+  private final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L;
   private HelixAdmin _admin;
   protected AssignmentMetadataStore _assignmentMetadataStore;
   HelixDataAccessor _dataAccessor;
@@ -72,6 +89,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     for (int i = 0; i < NUM_NODE; i++) {
       String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _originalParticipantNames.add(participantName);
       addParticipant(participantName);
     }
 
@@ -88,24 +106,88 @@ public class TestInstanceOperation extends ZkTestBase {
     _configAccessor = new ConfigAccessor(_gZkClient);
     _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
 
+    setupClusterConfig();
+
+    createTestDBs(DEFAULT_RESOURCE_DELAY_TIME);
+
+    setUpWagedBaseline();
+
+    _admin = new ZKHelixAdmin(_gZkClient);
+  }
+
+  private void setupClusterConfig() {
+    _stateModelDelay = 3L;
     ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.stateTransitionCancelEnabled(true);
     clusterConfig.setDelayRebalaceEnabled(true);
     clusterConfig.setRebalanceDelayTime(1800000L);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
-    createTestDBs(1800000L);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
 
-    setUpWagedBaseline();
+  private void enabledTopologyAwareRebalance() {
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology(TOPOLOGY);
+    clusterConfig.setFaultZoneType(ZONE);
+    clusterConfig.setTopologyAwareEnabled(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
-    _admin = new ZKHelixAdmin(_gZkClient);
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  private void disableTopologyAwareRebalance() {
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopologyAwareEnabled(false);
+    clusterConfig.setTopology(null);
+    clusterConfig.setFaultZoneType(null);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  private void resetInstances() {
+    // Disable and drop any participants that are not in the original 
participant list.
+    Set<String> droppedParticipants = new HashSet<>();
+    for (int i = 0; i < _participants.size(); i++) {
+      String participantName = _participantNames.get(i);
+      if (!_originalParticipantNames.contains(participantName)) {
+        _participants.get(i).syncStop();
+        _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
participantName, false);
+        _gSetupTool.getClusterManagementTool()
+            .dropInstance(CLUSTER_NAME, 
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
participantName));
+        droppedParticipants.add(participantName);
+      }
+    }
+
+    // Remove the dropped instance from _participants and _participantNames
+    _participantNames.removeIf(droppedParticipants::contains);
+    _participants.removeIf(p -> 
droppedParticipants.contains(p.getInstanceName()));
+
+    for (int i = 0; i < _participants.size(); i++) {
+      // If instance is not connected to ZK, replace it
+      if (!_participants.get(i).isConnected()) {
+        // Drop bad instance from the cluster.
+        _gSetupTool.getClusterManagementTool()
+            .dropInstance(CLUSTER_NAME, 
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
_participantNames.get(i)));
+        _participants.set(i, createParticipant(_participantNames.get(i), 
Integer.toString(i),
+            "zone_" + i, null, true, -1));
+        _participants.get(i).syncStart();
+        continue;
+      }
+      _gSetupTool.getClusterManagementTool()
+          .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
+      _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
_participantNames.get(i), true);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
   @Test
   public void testEvacuate() throws Exception {
     System.out.println("START TestInstanceOperation.testEvacuate() at " + new 
Date(System.currentTimeMillis()));
     // EV should contain all participants, check resources one by one
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
     }
@@ -118,7 +200,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // New ev should contain all instances but the evacuated one
-    assignment = getEV();
+    assignment = getEVs();
     List<String> currentActiveInstances =
         _participantNames.stream().filter(n -> 
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
     for (String resource : _allDBs) {
@@ -143,7 +225,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // EV should contain all participants, check resources one by one
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
       validateAssignmentInEv(assignment.get(resource));
@@ -159,7 +241,7 @@ public class TestInstanceOperation extends ZkTestBase {
         .enableInstance(CLUSTER_NAME, mockNewInstance, false);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     //ev should contain all instances but the disabled one
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     List<String> currentActiveInstances =
         _participantNames.stream().filter(n -> 
!n.equals(mockNewInstance)).collect(Collectors.toList());
     for (String resource : _allDBs) {
@@ -175,7 +257,7 @@ public class TestInstanceOperation extends ZkTestBase {
     _gSetupTool.getClusterManagementTool()
         .enableInstance(CLUSTER_NAME, mockNewInstance, true);
     //ev should be the same
-    assignment = getEV();
+    assignment = getEVs();
     currentActiveInstances =
         _participantNames.stream().filter(n -> 
!n.equals(mockNewInstance)).collect(Collectors.toList());
     for (String resource : _allDBs) {
@@ -193,7 +275,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // EV should contain all participants, check resources one by one
-     assignment = getEV();
+    assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
       validateAssignmentInEv(assignment.get(resource));
@@ -234,7 +316,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // sleep a bit so ST messages can start executing
     Thread.sleep(Math.abs(_stateModelDelay / 100));
     // before we cancel, check current EV
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
       // check every replica has >= 3 partitions and a top state partition
       validateAssignmentInEv(assignment.get(resource));
@@ -244,7 +326,7 @@ public class TestInstanceOperation extends ZkTestBase {
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
 
-    assignment = getEV();
+    assignment = getEVs();
     for (String resource : _allDBs) {
       // check every replica has >= 3 active replicas, even before cluster 
converge
       validateAssignmentInEv(assignment.get(resource));
@@ -254,7 +336,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // EV should contain all participants, check resources one by one
-    assignment = getEV();
+    assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
       // check every replica has >= 3 active replicas again
@@ -283,7 +365,7 @@ public class TestInstanceOperation extends ZkTestBase {
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
     // check every replica has >= 3 active replicas, even before cluster 
converge
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
       validateAssignmentInEv(assignment.get(resource));
     }
@@ -291,7 +373,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // EV should contain all participants, check resources one by one
-    assignment = getEV();
+    assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
       // check every replica has >= 3 active replicas
@@ -309,7 +391,7 @@ public class TestInstanceOperation extends ZkTestBase {
     addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
-    Map<String, ExternalView> assignment = getEV();
+    Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
       
Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE)));
     }
@@ -332,7 +414,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    assignment = getEV();
+    assignment = getEVs();
     List<String> currentActiveInstances =
         _participantNames.stream().filter(n -> 
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
     for (String resource : _allDBs) {
@@ -342,6 +424,8 @@ public class TestInstanceOperation extends ZkTestBase {
       
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
     }
     Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, 
instanceToEvacuate));
+
+    _stateModelDelay = 3L;
   }
 
   @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
@@ -356,7 +440,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     Map<String, ExternalView> assignment;
     // EV should contain all participants, check resources one by one
-    assignment = getEV();
+    assignment = getEVs();
     for (String resource : _allDBs) {
       TestHelper.verify(() -> {
         ExternalView ev = assignment.get(resource);
@@ -379,13 +463,686 @@ public class TestInstanceOperation extends ZkTestBase {
       }, 30000);
     }
 
-    _participants.get(1).syncStart();
-    _participants.get(2).syncStart();
+    resetInstances();
+    dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", 
"TEST_DB4_DELAYED_WAGED"));
   }
 
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testEvacuationWithOfflineInstancesInCluster")
+  public void testAddingNodeWithSwapOutInstanceOperation() throws Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new 
Date(
+            System.currentTimeMillis()));
+
+    enabledTopologyAwareRebalance();
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_OUT, true, -1);
+  }
 
-  private void addParticipant(String participantName) {
-    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testAddingNodeWithSwapOutInstanceOperation")
+  public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws 
Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at "
+            + new Date(System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Set instance's InstanceOperation to null
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testAddingNodeWithSwapOutNodeInstanceOperationUnset")
+  public void testNodeSwapWithNoSwapOutNode() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date(
+        System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Add new instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, "1000", "zone_1000",
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
+  public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws 
Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at "
+            + new Date(System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with same logicalId with InstanceOperation unset
+    // This should work because adding instance with InstanceOperation unset 
will automatically
+    // set the InstanceOperation to SWAP_IN.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapSwapInNodeNoInstanceOperationEnabled")
+  public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception 
{
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at "
+            + new Date(System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    // Add another instance with InstanceOperation set to SWAP_IN with same 
logicalId as previously
+    // added SWAP_IN instance.
+    String secondInstanceToSwapInName =
+        PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size());
+    addParticipant(secondInstanceToSwapInName,
+        instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapSwapInNodeWithAlreadySwappingPair")
+  public void testNodeSwapNoTopologySetup() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
+        System.currentTimeMillis()));
+    disableTopologyAwareRebalance();
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    // There should be an error that the logicalId does not have SWAP_OUT 
instance because,
+    // helix can't determine what topology key to use to get the logicalId if 
TOPOLOGY is not set.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapNoTopologySetup")
+  public void testNodeSwapWrongFaultZone() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date(
+        System.currentTimeMillis()));
+    // Re-enable topology aware rebalancing and set TOPOLOGY.
+    enabledTopologyAwareRebalance();
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    // There should be an error because SWAP_IN instance must be in the same 
FAULT_ZONE as the SWAP_OUT instance.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE) + "1",
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapWrongFaultZone")
+  public void testNodeSwapWrongCapacity() throws Exception {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date(
+        System.currentTimeMillis()));
+    resetInstances();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    // There should be an error because SWAP_IN instance must have same 
capacity as the SWAP_OUT node.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, TEST_CAPACITY_VALUE 
- 10);
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapWrongCapacity")
+  public void testNodeSwap() throws Exception {
+    System.out.println(
+        "START TestInstanceOperation.testNodeSwap() at " + new 
Date(System.currentTimeMillis()));
+    resetInstances();
+
+    // Store original EV
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
+    // and adding the SWAP_IN instance to the cluster.
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
+        0);
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
+    // swap was completed.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName));
+  }
+
+  @Test(dependsOnMethods = "testNodeSwap")
+  public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws 
Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
+            + new Date(System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Store original EVs
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Add instance with InstanceOperation unset, should automatically be set 
to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Enable the SWAP_IN instance, so it can start being assigned replicas
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instanceToSwapInName, true);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
+    // and adding the SWAP_IN instance to the cluster.
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
+        0);
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
+    // swap was completed.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName));
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled")
+  public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception {
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new 
Date(
+            System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Store original EVs
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
+    // and adding the SWAP_IN instance to the cluster.
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT 
InstanceOperation from SWAP_OUT instance.
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+
+    // Wait for cluster to converge.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Validate there are no partitions on the SWAP_IN instance.
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapInName).size(), 0);
+
+    // Validate that the SWAP_OUT instance has the same partitions as it had 
before.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Validate there are no partitions on the SWAP_IN instance.
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapInName).size(), 0);
+
+    // Validate that the SWAP_OUT instance has the same partitions as it had 
before.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete")
+  public void testNodeSwapAfterEMM() throws Exception {
+    System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at 
" + new Date(
+        System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Store original EVs
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Put the cluster in maintenance mode.
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    // Validate that the assignment has not changed since adding the SWAP_IN 
node.
+    // During MM, the cluster should not compute new assignment.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // Remove the cluster from maintenance mode.
+    // Now swapping will begin
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
exiting MM
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
+        0);
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
+    // swap was completed.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName));
+  }
+
+  @Test(dependsOnMethods = "testNodeSwapAfterEMM")
+  public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception {
+    System.out.println(
+        "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() 
at " + new Date(
+            System.currentTimeMillis()));
+
+    resetInstances();
+
+    // Store original EVs
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    // Set instance's InstanceOperation to SWAP_OUT
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    Set<String> swapOutInstanceOriginalPartitions =
+        getPartitionsAndStatesOnInstance(originalEVs, 
instanceToSwapOutName).keySet();
+
+    // Disable the SWAP_OUT instance.
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Validate that the SWAP_OUT instance has all partitions in OFFLINE state
+    Set<String> swapOutInstanceOfflineStates =
+        new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).values());
+    Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1);
+    Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE"));
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Validate that the SWAP_IN instance has the same partitions as the 
SWAP_OUT instance in second top state.
+    Map<String, String> swapInInstancePartitionsAndStates =
+        getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
+    Assert.assertTrue(
+        
swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
+    Set<String> swapInInstanceStates = new 
HashSet<>(swapInInstancePartitionsAndStates.values());
+    swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
+    Assert.assertEquals(swapInInstanceStates.size(), 0);
+
+    // Assert canSwapBeCompleted is false because SWAP_OUT instance is 
disabled.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Enable the SWAP_OUT instance.
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance originally
+    // had. Validate they are in second top state because initially disabling 
SWAP_OUT instance
+    // caused all topStates to be handed off to next replica in the preference 
list.
+    swapInInstancePartitionsAndStates =
+        getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
+    Assert.assertTrue(
+        
swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
+    swapInInstanceStates = new 
HashSet<>(swapInInstancePartitionsAndStates.values());
+    swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
+    Assert.assertEquals(swapInInstanceStates.size(), 0);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
+        0);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapWithSwapOutInstanceDisabled")
+  public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+            + new Date(System.currentTimeMillis()));
+    resetInstances();
+
+    // Get the SWAP_OUT instance.
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+    // Add instance with InstanceOperation set to SWAP_IN enabled before 
setting SWAP_OUT instance.
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
true, -1);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
+  public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
+    System.out.println(
+        "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
+            + new Date(System.currentTimeMillis()));
+    resetInstances();
+
+    // Get the SWAP_OUT instance.
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instanceToSwapInName, true);
+  }
+
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
+  public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() {
+    System.out.println(
+        "START 
TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
 at "
+            + new Date(System.currentTimeMillis()));
+    resetInstances();
+
+    // Get the SWAP_OUT instance.
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Try to remove the InstanceOperation from the SWAP_IN instance before 
the SWAP_OUT instance is set.
+    // This should throw exception because we cannot ever have two instances 
with the same logicalId and both have InstanceOperation
+    // unset.
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
+  }
+
+  @Test(dependsOnMethods = 
"testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut")
+  public void testNodeSwapAddSwapInFirst() {
+    System.out.println("START 
TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date(
+        System.currentTimeMillis()));
+    resetInstances();
+
+    // Store original EV
+    Map<String, ExternalView> originalEVs = getEVs();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    // Get the SWAP_OUT instance.
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = 
_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+    // Add instance with InstanceOperation set to SWAP_IN
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
_participants.size());
+    swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
+    addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, 
false, -1);
+
+    // Validate that the assignment has not changed since setting the 
InstanceOperation to SWAP_OUT
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
+    // After the SWAP_IN instance is added, we set the InstanceOperation to 
SWAP_OUT
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, 
instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.SWAP_OUT);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Enable the SWAP_IN instance to begin the swap operation.
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instanceToSwapInName, true);
+
+    // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
+    // and adding the SWAP_IN instance to the cluster.
+    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT 
instance
+    // but none of them are in a top state.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Set.of(instanceToSwapInName), Collections.emptySet());
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // Assert that SWAP_OUT instance is disabled and has no partitions 
assigned to it.
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), 
instanceToSwapOutName).size(),
+        0);
+
+    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
+    // swap was completed.
+    validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Set.of(instanceToSwapInName));
+  }
+
+  private MockParticipantManager createParticipant(String participantName, 
String logicalId, String zone,
+      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, 
int capacity) {
+    InstanceConfig config = new InstanceConfig.Builder().setDomain(
+            String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, 
participantName, LOGICAL_ID,
+                
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+        .build(participantName);
+    if (capacity >= 0) {
+      config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
+    }
+    _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
 
     // start dummy participants
     MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, participantName);
@@ -393,12 +1150,24 @@ public class TestInstanceOperation extends ZkTestBase {
     // Using a delayed state model
     StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
     stateMachine.registerStateModelFactory("MasterSlave", delayFactory);
+    return participant;
+  }
+
+  private void addParticipant(String participantName, String logicalId, String 
zone,
+      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, 
int capacity) {
+    MockParticipantManager participant = createParticipant(participantName, 
logicalId, zone,
+        instanceOperation, enabled, capacity);
 
     participant.syncStart();
     _participants.add(participant);
     _participantNames.add(participantName);
   }
 
+  private void addParticipant(String participantName) {
+    addParticipant(participantName, Integer.toString(_participants.size()),
+        "zone_" + _participants.size(), null, true, -1);
+  }
+
    private void createTestDBs(long delayTime) throws InterruptedException {
      createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
          BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, 
REPLICA, REPLICA - 1, -1,
@@ -415,7 +1184,15 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
-  private Map<String, ExternalView> getEV() {
+  private void dropTestDBs(Set<String> dbs) {
+    for (String db : dbs) {
+      _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db);
+      _allDBs.remove(db);
+    }
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  private Map<String, ExternalView> getEVs() {
     Map<String, ExternalView> externalViews = new HashMap<String, 
ExternalView>();
     for (String db : _allDBs) {
       ExternalView ev = 
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
@@ -450,7 +1227,65 @@ public class TestInstanceOperation extends ZkTestBase {
     return assignedParticipants;
   }
 
-  // verify that each partition has >=REPLICA (3 in this case) replicas
+  private Map<String, String> getPartitionsAndStatesOnInstance(Map<String, 
ExternalView> evs,
+      String instanceName) {
+    Map<String, String> instancePartitions = new HashMap<>();
+    for (String resourceEV : evs.keySet()) {
+      for (String partition : evs.get(resourceEV).getPartitionSet()) {
+        if 
(evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) {
+          instancePartitions.put(partition,
+              evs.get(resourceEV).getStateMap(partition).get(instanceName));
+        }
+      }
+    }
+
+    return instancePartitions;
+  }
+
+  private void validateEVCorrect(ExternalView actual, ExternalView original,
+      Map<String, String> swapOutInstancesToSwapInInstances, Set<String> 
inFlightSwapInInstances,
+      Set<String> completedSwapInInstanceNames) {
+    Assert.assertEquals(actual.getPartitionSet(), original.getPartitionSet());
+    IdealState is = _gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, original.getResourceName());
+    StateModelDefinition stateModelDef = _gSetupTool.getClusterManagementTool()
+        .getStateModelDef(CLUSTER_NAME, is.getStateModelDefRef());
+    for (String partition : actual.getPartitionSet()) {
+      Map<String, String> expectedStateMap = new 
HashMap<>(original.getStateMap(partition));
+      for (String swapOutInstance : 
swapOutInstancesToSwapInInstances.keySet()) {
+        if (expectedStateMap.containsKey(swapOutInstance) && 
inFlightSwapInInstances.contains(
+            swapOutInstancesToSwapInInstances.get(swapOutInstance))) {
+          // If the corresponding swapInInstance is in-flight, add it to the 
expectedStateMap
+          // with the same state as the swapOutInstance or secondState if the 
swapOutInstance
+          // has a topState.
+          
expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance),
+              
expectedStateMap.get(swapOutInstance).equals(stateModelDef.getTopState())
+                  ? (String) stateModelDef.getSecondTopStates().toArray()[0]
+                  : expectedStateMap.get(swapOutInstance));
+        } else if (expectedStateMap.containsKey(swapOutInstance)
+            && completedSwapInInstanceNames.contains(
+            swapOutInstancesToSwapInInstances.get(swapOutInstance))) {
+          // If the corresponding swapInInstance is completed, add it to the 
expectedStateMap
+          // with the same state as the swapOutInstance.
+          
expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance),
+              expectedStateMap.get(swapOutInstance));
+          expectedStateMap.remove(swapOutInstance);
+        }
+      }
+      Assert.assertEquals(actual.getStateMap(partition), expectedStateMap, 
"Error for partition " + partition
+          + " in resource " + actual.getResourceName());
+    }
+  }
+
+  private void validateEVsCorrect(Map<String, ExternalView> actuals,
+      Map<String, ExternalView> originals, Map<String, String> 
swapOutInstancesToSwapInInstances,
+      Set<String> inFlightSwapInInstances, Set<String> 
completedSwapInInstanceNames) {
+    Assert.assertEquals(actuals.keySet(), originals.keySet());
+    for (String resource : actuals.keySet()) {
+      validateEVCorrect(actuals.get(resource), originals.get(resource),
+          swapOutInstancesToSwapInInstances, inFlightSwapInInstances, 
completedSwapInInstanceNames);
+    }
+  }
 
   private void validateAssignmentInEv(ExternalView ev) {
     validateAssignmentInEv(ev, REPLICA);
@@ -460,10 +1295,7 @@ public class TestInstanceOperation extends ZkTestBase {
     Set<String> partitionSet = ev.getPartitionSet();
     for (String partition : partitionSet) {
       AtomicInteger activeReplicaCount = new AtomicInteger();
-      ev.getStateMap(partition)
-          .values()
-          .stream()
-          .filter(v -> v.equals("MASTER") || v.equals("LEADER") || 
v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
+      
ev.getStateMap(partition).values().stream().filter(ACCEPTABLE_STATE_SET::contains)
           .forEach(v -> activeReplicaCount.getAndIncrement());
       Assert.assertTrue(activeReplicaCount.get() >=expectedNumber);
     }
@@ -486,10 +1318,10 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Set test instance capacity and partition weights
     ClusterConfig clusterConfig = 
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
-    String testCapacityKey = "TestCapacityKey";
-    
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
-    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
 100));
-    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
 1));
+    
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY));
+    clusterConfig.setDefaultInstanceCapacityMap(
+        Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE));
+    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY,
 1));
     _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), 
clusterConfig);
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index e1ffbb646..59decd98e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -898,24 +898,28 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
       InstanceConfig instanceConfig = new InstanceConfig(instanceName);
       instanceConfig.setHostName(hostname);
       instanceConfig.setPort(port);
-      if (i == 40) {
-        instanceConfig.setDomain(String
-            .format("invaliddomain=%s,zone=%s,rack=%s,host=%s", "mygroup" + i 
% 2, "myzone" + i % 4,
-                "myrack" + i % 4, hostname));
-      } else if (i == 41) {
-        instanceConfig.setDomain("invaliddomain");
-      } else {
-        String domain = String
-            .format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, 
"myzone" + i % 4,
-                "myrack" + i % 4, hostname);
-        instanceConfig.setDomain(domain);
-      }
+
+      String domain =
+          String.format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, 
"myzone" + i % 4,
+              "myrack" + i % 4, hostname);
+      instanceConfig.setDomain(domain);
+
       LiveInstance liveInstance = new LiveInstance(instanceName);
       liveInstance.setSessionId(UUID.randomUUID().toString());
       liveInstance.setHelixVersion(UUID.randomUUID().toString());
       accessor.setProperty(keyBuilder.liveInstance(instanceName), 
liveInstance);
       admin.addInstance(clusterName, instanceConfig);
       admin.enableInstance(clusterName, instanceName, true);
+
+      if (i == 40) {
+        instanceConfig.setDomain(
+            String.format("invaliddomain=%s,zone=%s,rack=%s,host=%s", 
"mygroup" + i % 2,
+                "myzone" + i % 4, "myrack" + i % 4, hostname));
+        admin.setInstanceConfig(clusterName, instanceName, instanceConfig);
+      } else if (i == 41) {
+        instanceConfig.setDomain("invaliddomain");
+        admin.setInstanceConfig(clusterName, instanceName, instanceConfig);
+      }
     }
 
     ClusterTopology clusterTopology = admin.getClusterTopology(clusterName);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 512a7b4db..aa93bc9c8 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -556,6 +556,16 @@ public class MockHelixAdmin implements HelixAdmin {
     return false;
   }
 
+  @Override
+  public boolean canCompleteSwap(String clusterName, String instancesNames) {
+    return false;
+  }
+
+  @Override
+  public boolean completeSwapIfPossible(String clusterName, String 
instanceName) {
+    return false;
+  }
+
   @Override
   public boolean isReadyForPreparingJoiningCluster(String clusterName, String 
instancesNames) {
     return false;
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 48b467eaa..64fbaff41 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -86,6 +86,8 @@ public class AbstractResource {
     getInstance,
     getAllInstances,
     setInstanceOperation, // TODO: Name is just a place holder, may change in 
future
+    canCompleteSwap,
+    completeSwapIfPossible,
     onDemandRebalance
   }
 
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index efc3ce652..b920f66ce 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -427,52 +427,63 @@ public class PerInstanceAccessor extends 
AbstractHelixResource {
           }
           admin.resetPartition(clusterId, instanceName,
               node.get(PerInstanceProperties.resource.name()).textValue(),
-              (List<String>) OBJECT_MAPPER
-                  
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
-                    OBJECT_MAPPER.getTypeFactory()
-                        .constructCollectionType(List.class, String.class)));
-        break;
-      case setInstanceOperation:
-         admin.setInstanceOperation(clusterId, instanceName, state);
-         break;
-      case addInstanceTag:
-        if (!validInstance(node, instanceName)) {
-          return badRequest("Instance names are not match!");
-        }
-        for (String tag : (List<String>) OBJECT_MAPPER
-            
.readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
-                
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class))) {
-          admin.addInstanceTag(clusterId, instanceName, tag);
-        }
-        break;
-      case removeInstanceTag:
-        if (!validInstance(node, instanceName)) {
-          return badRequest("Instance names are not match!");
-        }
-        for (String tag : (List<String>) OBJECT_MAPPER
-            
.readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
-                
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class))) {
-          admin.removeInstanceTag(clusterId, instanceName, tag);
-        }
-        break;
-      case enablePartitions:
-        admin.enablePartition(true, clusterId, instanceName,
-            node.get(PerInstanceProperties.resource.name()).textValue(),
-            (List<String>) OBJECT_MAPPER
-                
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
-                    OBJECT_MAPPER.getTypeFactory()
-                        .constructCollectionType(List.class, String.class)));
-        break;
-      case disablePartitions:
-        admin.enablePartition(false, clusterId, instanceName,
-            node.get(PerInstanceProperties.resource.name()).textValue(),
-            (List<String>) OBJECT_MAPPER
-                
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
-                    
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class)));
-        break;
-      default:
-        LOG.error("Unsupported command :" + command);
-        return badRequest("Unsupported command :" + command);
+              (List<String>) OBJECT_MAPPER.readValue(
+                  node.get(PerInstanceProperties.partitions.name()).toString(),
+                  OBJECT_MAPPER.getTypeFactory()
+                      .constructCollectionType(List.class, String.class)));
+          break;
+        case setInstanceOperation:
+          admin.setInstanceOperation(clusterId, instanceName, state);
+          break;
+        case canCompleteSwap:
+          if (!admin.canCompleteSwap(clusterId, instanceName)) {
+            return badRequest("Swap is not ready to be completed!");
+          }
+          break;
+        case completeSwapIfPossible:
+          if (!admin.completeSwapIfPossible(clusterId, instanceName)) {
+            return badRequest("Swap is not ready to be completed!");
+          }
+          break;
+        case addInstanceTag:
+          if (!validInstance(node, instanceName)) {
+            return badRequest("Instance names are not match!");
+          }
+          for (String tag : (List<String>) OBJECT_MAPPER.readValue(
+              node.get(PerInstanceProperties.instanceTags.name()).toString(),
+              
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class))) {
+            admin.addInstanceTag(clusterId, instanceName, tag);
+          }
+          break;
+        case removeInstanceTag:
+          if (!validInstance(node, instanceName)) {
+            return badRequest("Instance names are not match!");
+          }
+          for (String tag : (List<String>) OBJECT_MAPPER.readValue(
+              node.get(PerInstanceProperties.instanceTags.name()).toString(),
+              
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class))) {
+            admin.removeInstanceTag(clusterId, instanceName, tag);
+          }
+          break;
+        case enablePartitions:
+          admin.enablePartition(true, clusterId, instanceName,
+              node.get(PerInstanceProperties.resource.name()).textValue(),
+              (List<String>) OBJECT_MAPPER.readValue(
+                  node.get(PerInstanceProperties.partitions.name()).toString(),
+                  OBJECT_MAPPER.getTypeFactory()
+                      .constructCollectionType(List.class, String.class)));
+          break;
+        case disablePartitions:
+          admin.enablePartition(false, clusterId, instanceName,
+              node.get(PerInstanceProperties.resource.name()).textValue(),
+              (List<String>) OBJECT_MAPPER.readValue(
+                  node.get(PerInstanceProperties.partitions.name()).toString(),
+                  OBJECT_MAPPER.getTypeFactory()
+                      .constructCollectionType(List.class, String.class)));
+          break;
+        default:
+          LOG.error("Unsupported command :" + command);
+          return badRequest("Unsupported command :" + command);
       }
     } catch (Exception e) {
       LOG.error("Failed in updating instance : " + instanceName, e);

Reply via email to