This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new 7613cabf0 HelixAdmin APIs and pipeline changes to support Helix Node
Swap (#2661)
7613cabf0 is described below
commit 7613cabf007a8983895843e2520ddd28b0ff67dc
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Nov 3 15:12:14 2023 -0700
HelixAdmin APIs and pipeline changes to support Helix Node Swap (#2661)
Add ability for up to 2 nodes with the same logicalId to be added to the
cluster at the same time when a SWAP is happening.
During all paritionAssignment for WAGED and DelayedAutoRebalancer, we
select just one instance for each logicalId. Achieves n -> n+1 for all replicas
on SWAP_OUT node and back n when SWAP is marked complete, making it cancelable.
Adding and updating Helix Admin APIs to support swap operation:
setInstanceOperation
addInstance
canCompleteSwap
completeSwapIfPossible
* Refactor sanity checks for HelixAdmin swap APIs.
* Helix Node Swap pipeline changes and integration tests.
* Fix integration tests to properly restore stopped MockParticipant so
following tests are not affected.
* Add comments and docstrings.
* Fix tests to clean up after themselves.
* Optimize duplicate logicalId filtering to only be called on allNodes and
then used to remove duplicate logicalIds from enabledLiveNodes.
* Add handling for clusterConfig == null in updateSwappingInstances and fix
AssignableNode to check for clusterTopologyConfig when attempting to get
logicalId.
* Fix integ tests.
* Fix testGetDomainInformation since we no longer allow an instance to join
the cluster with an invalid DOMAIN field.
* Add checks to ensure that the SWAP_IN instance has a matching FAULT_ZONE
and matching INSTANCE_CAPACITY_MAP to SWAP_OUT node.
* Rename canSwapBeCompleted to canCompleteSwap.
* Add sanity checks to allow SWAP_IN node to join the cluster in disabled
state before SWAP_OUT node has instance operation set.
* Fix print in test case.
* Add canCompleteSwap to PerInstanceAccessor and fix formatting.
* Fix flaky node swap after completion by making sure replica has is
computed with logicalIds intead of instanceNames.
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 33 +-
.../dataproviders/BaseControllerDataProvider.java | 71 ++
.../rebalancer/DelayedAutoRebalancer.java | 75 +-
.../rebalancer/util/DelayedRebalanceUtil.java | 88 ++
.../rebalancer/waged/GlobalRebalanceRunner.java | 12 +-
.../rebalancer/waged/WagedRebalancer.java | 57 +-
.../constraints/ConstraintBasedAlgorithm.java | 4 +-
.../rebalancer/waged/model/AssignableNode.java | 18 +-
.../rebalancer/waged/model/ClusterModel.java | 8 +
.../waged/model/ClusterModelProvider.java | 6 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 423 +++++++++-
.../org/apache/helix/model/InstanceConfig.java | 28 +
.../rebalancer/waged/TestWagedRebalancer.java | 4 +-
.../rebalancer/TestInstanceOperation.java | 896 ++++++++++++++++++++-
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 28 +-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 10 +
.../rest/server/resources/AbstractResource.java | 2 +
.../resources/helix/PerInstanceAccessor.java | 103 +--
18 files changed, 1720 insertions(+), 146 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 085a987b1..f53b886e2 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
+
import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
@@ -302,8 +304,15 @@ public interface HelixAdmin {
*/
void enableInstance(String clusterName, List<String> instances, boolean
enabled);
- void setInstanceOperation(String clusterName, String instance,
- InstanceConstants.InstanceOperation instanceOperation);
+ /**
+ * Set the instanceOperation field.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation
+ */
+ void setInstanceOperation(String clusterName, String instanceName,
+ @Nullable InstanceConstants.InstanceOperation instanceOperation);
/**
* Disable or enable a resource
@@ -747,6 +756,26 @@ public interface HelixAdmin {
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);
+ /**
+ * Check to see if swapping between two instances can be completed. Either
the swapOut or
+ * swapIn instance can be passed in.
+ * @param clusterName The cluster name
+ * @param instanceName The instance that is being swapped out or swapped in
+ * @return True if the swap is ready to be completed, false otherwise.
+ */
+ boolean canCompleteSwap(String clusterName, String instanceName);
+
+ /**
+ * Check to see if swapping between two instances is ready to be completed
and complete it if
+ * possible. Either the swapOut or swapIn instance can be passed in.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance that is being swapped out or swapped in
+ * @return True if the swap is ready to be completed and was completed
successfully, false
+ * otherwise.
+ */
+ boolean completeSwapIfPossible(String clusterName, String instanceName);
+
/**
* Return if instance is ready for preparing joining cluster. The instance
should have no current state,
* no pending message and tagged with operation that exclude the instance
from Helix assignment.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 8e8f9fa9b..9dd517384 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -46,10 +46,12 @@ import org.apache.helix.common.caches.InstanceMessagesCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.common.controllers.ControlContextProvider;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import
org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
private Map<String, Map<String, String>> _idealStateRuleMap;
private final Map<String, Map<String, Set<String>>>
_disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
+ private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName =
new HashMap<>();
+ private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver>
_abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new
HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance
= new HashMap<>();
@@ -437,6 +441,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
updateIdealRuleMap(getClusterConfig());
updateDisabledInstances(getInstanceConfigMap().values(),
getClusterConfig());
+ updateSwappingInstances(getInstanceConfigMap().values(),
getEnabledLiveInstances(),
+ getClusterConfig());
return refreshedTypes;
}
@@ -471,6 +477,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
refreshAbnormalStateResolverMap(_clusterConfig);
updateIdealRuleMap(_clusterConfig);
updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
+ updateSwappingInstances(getInstanceConfigMap().values(),
getEnabledLiveInstances(),
+ _clusterConfig);
}
@Override
@@ -617,6 +625,24 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
return Collections.unmodifiableSet(_disabledInstanceSet);
}
+ /**
+ * Get all swapping instance pairs.
+ *
+ * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN
instanceNames.
+ */
+ public Map<String, String> getSwapOutToSwapInInstancePairs() {
+ return
Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
+ }
+
+ /**
+ * Get all the enabled and live SWAP_IN instances.
+ *
+ * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT
instance.
+ */
+ public Set<String> getEnabledLiveSwapInInstanceNames() {
+ return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
+ }
+
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
_updateInstanceOfflineTime = true;
@@ -750,6 +776,8 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
public void setInstanceConfigMap(Map<String, InstanceConfig>
instanceConfigMap) {
_instanceConfigCache.setPropertyMap(instanceConfigMap);
updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
+ updateSwappingInstances(instanceConfigMap.values(),
getEnabledLiveInstances(),
+ getClusterConfig());
}
/**
@@ -858,6 +886,49 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
}
}
+ private void updateSwappingInstances(Collection<InstanceConfig>
instanceConfigs,
+ Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
+ _swapOutInstanceNameToSwapInInstanceName.clear();
+ _enabledLiveSwapInInstanceNames.clear();
+
+ if (clusterConfig == null) {
+ logger.warn("Skip refreshing swapping instances because clusterConfig is
null.");
+ return;
+ }
+
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+
+ Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
+ Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+ instanceConfigs.forEach(instanceConfig -> {
+ if (instanceConfig == null) {
+ return;
+ }
+ if (instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+ swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
+
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
+ }
+ if (instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ swapInInstancesByLogicalId.put(
+
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
+ instanceConfig.getInstanceName());
+ }
+ });
+
+ swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
+ String swapInInstanceName = swapInInstancesByLogicalId.get(value);
+ if (swapInInstanceName != null) {
+ _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName,
swapInInstanceName);
+ if (liveEnabledInstances.contains(swapInInstanceName)) {
+ _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+ }
+ }
+ });
+ }
+
/*
* Check if the instance is timed-out during maintenance mode. An instance
is timed-out if it has
* been offline for longer than the user defined timeout window.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 9cd0f71cd..442ddfb02 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -31,17 +31,17 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.constants.InstanceConstants;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import
org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
@@ -56,7 +56,8 @@ import org.slf4j.LoggerFactory;
*/
public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG =
LoggerFactory.getLogger(DelayedAutoRebalancer.class);
- public static final Set<String>
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE",
"SWAP_IN");
+ public static ImmutableSet<String>
INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
+ ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());
@Override
public IdealState computeNewIdealState(String resourceName,
@@ -113,9 +114,16 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
allNodes = clusterData.getAllInstances();
}
+ Set<String> allNodesDeduped =
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
+ clusterData.getInstanceConfigMap(), allNodes);
+ // Remove the non-selected instances with duplicate logicalIds from
liveEnabledNodes
+ // This ensures the same duplicate instance is kept in both
allNodesDeduped and liveEnabledNodes
+ liveEnabledNodes.retainAll(allNodesDeduped);
+
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState,
clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
- .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
+ .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
@@ -127,11 +135,11 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
clusterConfig, _manager);
}
- if (allNodes.isEmpty() || activeNodes.isEmpty()) {
+ if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s",
- resourceName, allNodes, liveEnabledNodes, activeNodes));
+ resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
@@ -157,41 +165,58 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(),
allPartitions, resourceName,
stateCountMap, maxPartition);
- // sort node lists to ensure consistent preferred assignments
- List<String> allNodeList = new ArrayList<>(allNodes);
- // We will not assign partition to instances with evacuation and wap-out
tag.
+ List<String> allNodeList = new ArrayList<>(allNodesDeduped);
+
// TODO: Currently we have 2 groups of instances and compute preference
list twice and merge.
// Eventually we want to have exclusive groups of instance for different
instance tag.
- List<String> liveEnabledAssignableNodeList =
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
- liveEnabledNodes);
+ List<String> liveEnabledAssignableNodeList = new ArrayList<>(
+ // We will not assign partitions to instances with EVACUATE
InstanceOperation.
+
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
+ liveEnabledNodes));
+ // sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);
- ZNRecord newIdealMapping = _rebalanceStrategy
- .computePartitionAssignment(allNodeList,
liveEnabledAssignableNodeList, currentMapping, clusterData);
+ ZNRecord newIdealMapping =
+ _rebalanceStrategy.computePartitionAssignment(allNodeList,
liveEnabledAssignableNodeList,
+ currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState,
clusterConfig)
- || liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
+ || liveEnabledAssignableNodeList.size() != activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig,
currentIdealState),
currentIdealState, replicaCount);
- ZNRecord newActiveMapping = _rebalanceStrategy
- .computePartitionAssignment(allNodeList, activeNodeList,
currentMapping, clusterData);
+ ZNRecord newActiveMapping =
+ _rebalanceStrategy.computePartitionAssignment(allNodeList,
activeNodeList, currentMapping,
+ clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState,
newIdealMapping, newActiveMapping,
liveEnabledNodes, replicaCount, minActiveReplicas);
}
finalMapping.getListFields().putAll(userDefinedPreferenceList);
+ // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs
in the cluster.
+ Map<String, String> swapOutToSwapInInstancePairs =
+ clusterData.getSwapOutToSwapInInstancePairs();
+ // 2. Get all enabled and live SWAP_IN instances in the cluster.
+ Set<String> enabledLiveSwapInInstances =
clusterData.getEnabledLiveSwapInInstanceNames();
+ // 3. For each SWAP_OUT instance in any of the preferenceLists, add the
corresponding SWAP_IN instance to the end.
+ // Skipping this when there are not SWAP_IN instances ready(enabled and
live) will reduce computation time when there is not an active
+ // swap occurring.
+ if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
+
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
+ swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
+ }
+
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
- LOG.debug("allNodes: {}", allNodes);
+ LOG.debug("allNodes: {}", allNodesDeduped);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
@@ -201,14 +226,6 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
return idealState;
}
- private static List<String> filterOutOnOperationInstances(Map<String,
InstanceConfig> instanceConfigMap,
- Set<String> nodes) {
- return nodes.stream()
- .filter(
- instance ->
!INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
- .collect(Collectors.toList());
- }
-
private IdealState generateNewIdealState(String resourceName, IdealState
currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
@@ -376,7 +393,7 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
// if preference list is not empty, and we do have new intanceToAdd, we
// should check if it has capacity to hold the partition.
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache
!= null;
- if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) {
+ if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
// check instanceToAdd instance appears in combinedPreferenceList
for (String instance : instanceToAdd) {
if (combinedPreferenceList.contains(instance)) {
@@ -409,7 +426,11 @@ public class DelayedAutoRebalancer extends
AbstractRebalancer<ResourceController
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop =
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
- bestPossibleStateMap.put(instanceToDrop,
HelixDefinedState.DROPPED.name());
+ // We do not want to drop a SWAP_IN node if it is at the end of the
preferenceList,
+ // because partitions are actively being added on this node to prepare
for SWAP completion.
+ if (cache == null ||
!cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
+ bestPossibleStateMap.put(instanceToDrop,
HelixDefinedState.DROPPED.name());
+ }
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 58bad164a..c7066d053 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -34,12 +34,14 @@ import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,6 +141,92 @@ public class DelayedRebalanceUtil {
.collect(Collectors.toSet());
}
+ /**
+ * Filter out instances with duplicate logical IDs. If there are duplicates,
the instance with
+ * InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN.
SWAP_IN is not
+ * assignable. If there are duplicates with one node having no
InstanceOperation and the other
+ * having SWAP_OUT, the node with no InstanceOperation will be chosen. This
signifies SWAP
+ * completion, therefore making the node assignable.
+ * TODO: Eventually when we refactor DataProvider to have
getAssignableInstances() and
+ * TODO: getAssignableEnabledLiveInstances() this will not need to be called
in the rebalancer.
+ * @param clusterTopologyConfig the cluster topology configuration
+ * @param instanceConfigMap the map of instance name to corresponding
InstanceConfig
+ * @param instances the set of instances to filter out duplicate
logicalIDs for
+ * @return the set of instances with duplicate logicalIDs filtered out,
there will only be one
+ * instance per logicalID
+ */
+ public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
+ ClusterTopologyConfig clusterTopologyConfig, Map<String, InstanceConfig>
instanceConfigMap,
+ Set<String> instances) {
+ Set<String> filteredNodes = new HashSet<>();
+ Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
+
+ instances.forEach(node -> {
+ InstanceConfig thisInstanceConfig = instanceConfigMap.get(node);
+ if (thisInstanceConfig == null) {
+ return;
+ }
+ String thisLogicalId =
+
thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
+
+ if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) {
+ InstanceConfig filteredDuplicateInstanceConfig =
+
instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId));
+ if ((filteredDuplicateInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+ && thisInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+ || thisInstanceConfig.getInstanceOperation().isEmpty()) {
+ // If the already filtered instance is SWAP_IN and this instance is
in SWAP_OUT, then replace the filtered
+ // instance with this instance. If this instance has no
InstanceOperation, then replace the filtered instance
+ // with this instance. This is the case where the SWAP_IN node has
been marked as complete or SWAP_IN exists and
+ // SWAP_OUT does not. There can never be a case where both have no
InstanceOperation set.
+
filteredNodes.remove(filteredInstancesByLogicalId.get(thisLogicalId));
+ filteredNodes.add(node);
+ filteredInstancesByLogicalId.put(thisLogicalId, node);
+ }
+ } else {
+ filteredNodes.add(node);
+ filteredInstancesByLogicalId.put(thisLogicalId, node);
+ }
+ });
+
+ return filteredNodes;
+ }
+
+ /**
+ * Look through the provided mapping and add corresponding SWAP_IN node if a
SWAP_OUT node exists
+ * in the partition's preference list.
+ *
+ * @param mapping the mapping to be updated (IdealState
ZNRecord)
+ * @param swapOutToSwapInInstancePairs the map of SWAP_OUT to SWAP_IN
instances
+ */
+ public static void
addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(ZNRecord mapping,
+ Map<String, String> swapOutToSwapInInstancePairs, Set<String>
enabledLiveSwapInInstances) {
+ Map<String, List<String>> preferenceListsByPartition =
mapping.getListFields();
+ for (String partition : preferenceListsByPartition.keySet()) {
+ List<String> preferenceList = preferenceListsByPartition.get(partition);
+ if (preferenceList == null) {
+ continue;
+ }
+ List<String> newInstancesToAdd = new ArrayList<>();
+ for (String instanceName : preferenceList) {
+ if (swapOutToSwapInInstancePairs.containsKey(instanceName)
+ && enabledLiveSwapInInstances.contains(
+ swapOutToSwapInInstancePairs.get(instanceName))) {
+ String swapInInstanceName =
swapOutToSwapInInstancePairs.get(instanceName);
+ if (!preferenceList.contains(swapInInstanceName) &&
!newInstancesToAdd.contains(
+ swapInInstanceName)) {
+ newInstancesToAdd.add(swapInInstanceName);
+ }
+ }
+ }
+ if (!newInstancesToAdd.isEmpty()) {
+ preferenceList.addAll(newInstancesToAdd);
+ }
+ }
+ }
+
/**
* Return the time when an offline or disabled instance should be treated as
inactive. Return -1
* if it is inactive now or forced to be rebalanced by an on-demand
rebalance.
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
index 6130e5c52..6c199bc1b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -30,10 +30,12 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.metrics.MetricCollector;
@@ -163,8 +165,14 @@ class GlobalRebalanceRunner implements AutoCloseable {
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore,
currentStateOutput, resourceMap.keySet());
ClusterModel clusterModel;
try {
- clusterModel =
- ClusterModelProvider.generateClusterModelForBaseline(clusterData,
resourceMap, clusterData.getAllInstances(),
+ clusterModel =
ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
+ // Dedupe and select correct node if there is more than one with the
same logical id.
+ // We should be calculating a new baseline only after a swap
operation is complete and the SWAP_IN node is selected
+ // by deduping. This ensures that adding the SWAP_IN node to the
cluster does not cause new baseline to be calculated
+ // with both the SWAP_OUT and SWAP_IN node.
+ DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+ clusterData.getInstanceConfigMap(),
clusterData.getAllInstances()),
clusterChanges, currentBaseline);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for
global rebalance.",
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index ad3c7b20d..8f71f4e6d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,8 +47,8 @@ import
org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
@@ -302,8 +302,17 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm
algorithm)
throws HelixRebalanceException {
- Set<String> activeNodes = DelayedRebalanceUtil
- .getActiveNodes(clusterData.getAllInstances(),
clusterData.getEnabledLiveInstances(),
+
+ Set<String> allNodesDeduped =
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+ clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+ // Remove the non-selected instances with duplicate logicalIds from
liveEnabledNodes
+ // This ensures the same duplicate instance is kept in both
allNodesDeduped and liveEnabledNodes
+ Set<String> liveEnabledNodesDeduped =
clusterData.getEnabledLiveInstances();
+ liveEnabledNodesDeduped.retainAll(allNodesDeduped);
+
+ Set<String> activeNodes =
+ DelayedRebalanceUtil.getActiveNodes(allNodesDeduped,
liveEnabledNodesDeduped,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(),
clusterData.getClusterConfig());
@@ -359,6 +368,20 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(
getPreferenceLists(assignments.get(resourceName),
statePriorityMap));
+
+ // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance
pairs in the cluster.
+ Map<String, String> swapOutToSwapInInstancePairs =
+ clusterData.getSwapOutToSwapInInstancePairs();
+ // 2. Get all enabled and live SWAP_IN instances in the cluster.
+ Set<String> enabledLiveSwapInInstances =
clusterData.getEnabledLiveSwapInInstanceNames();
+ // 3. For each SWAP_OUT instance in any of the preferenceLists, add
the corresponding SWAP_IN instance to the end.
+ // Skipping this when there are not SWAP_IN instances ready(enabled
and live) will reduce computation time when there is not an active
+ // swap occurring.
+ if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
+
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(
+ newIdealState.getRecord(), swapOutToSwapInInstancePairs,
enabledLiveSwapInInstances);
+ }
+
// Note the state mapping in the new assignment won't directly
propagate to the map fields.
// The rebalancer will calculate for the final state mapping
considering the current states.
finalIdealStateMap.put(resourceName, newIdealState);
@@ -398,7 +421,14 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
// TODO: this is a hacky way to filter our on operation instance. We
should consider redesign `getEnabledLiveInstances()`.
- final Set<String> enabledLiveInstances =
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
clusterData.getEnabledLiveInstances());
+ final Set<String> allNodesDeduped =
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+ clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+ final Set<String> enabledLiveInstances =
clusterData.getEnabledLiveInstances();
+ // Remove the non-selected instances with duplicate logicalIds from
liveEnabledNodes
+ // This ensures the same duplicate instance is kept in both
allNodesDeduped and liveEnabledNodes
+ enabledLiveInstances.retainAll(allNodesDeduped);
+
if (activeNodes.equals(enabledLiveInstances) ||
!requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
return currentResourceAssignment;
@@ -427,14 +457,6 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
}
}
- private static Set<String> filterOutOnOperationInstances(Map<String,
InstanceConfig> instanceConfigMap,
- Set<String> nodes) {
- return nodes.stream()
- .filter(
- instance ->
!DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
- .collect(Collectors.toSet());
- }
-
/**
* Emergency rebalance is scheduled to quickly handle urgent cases like
reassigning partitions from inactive nodes
* and addressing for partitions failing to meet minActiveReplicas.
@@ -619,8 +641,15 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment ->
{
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances =
- filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
clusterData.getEnabledLiveInstances());
+
+ Set<String> allNodesDeduped =
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
+
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
+ clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ // Remove the non-selected instances with duplicate logicalIds from
liveEnabledNodes
+ // This ensures the same duplicate instance is kept in both
allNodesDeduped and liveEnabledNodes
+ enabledLiveInstances.retainAll(allNodesDeduped);
+
int numReplica =
currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 5dbeb5c38..77d56302c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -153,7 +153,7 @@ class ConstraintBasedAlgorithm implements
RebalanceAlgorithm {
int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
- : -instanceName1.compareTo(instanceName2);
+ : -nodeEntry1.getKey().compareTo(nodeEntry2.getKey());
} else {
return scoreCompareResult;
}
@@ -193,7 +193,7 @@ class ConstraintBasedAlgorithm implements
RebalanceAlgorithm {
.containsKey(replica.getResourceName());
_isInBaselineAssignment =
clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
- _replicaHash = Objects.hash(replica.toString(),
clusterModel.getAssignableNodes().keySet());
+ _replicaHash = Objects.hash(replica.toString(),
clusterModel.getAssignableLogicalIds());
computeScore(overallClusterRemainingCapacityMap);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index e29052a0d..b4e8c3c71 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class AssignableNode implements
Comparable<AssignableNode> {
// Immutable Instance Properties
private final String _instanceName;
+ private final String _logicaId;
private final String _faultZone;
// maximum number of the partitions that can be assigned to the instance.
private final int _maxPartition;
@@ -72,8 +74,12 @@ public class AssignableNode implements
Comparable<AssignableNode> {
* ResourceConfig could
* subject to change. If the assumption is no longer true, this function
should become private.
*/
- AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
String instanceName) {
+ AssignableNode(ClusterConfig clusterConfig, ClusterTopologyConfig
clusterTopologyConfig,
+ InstanceConfig instanceConfig, String instanceName) {
_instanceName = instanceName;
+ _logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId(
+ clusterTopologyConfig.getEndNodeType())
+ : instanceName;
Map<String, Integer> instanceCapacity =
fetchInstanceCapacity(clusterConfig, instanceConfig);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
_instanceTags = ImmutableSet.copyOf(instanceConfig.getTags());
@@ -86,6 +92,10 @@ public class AssignableNode implements
Comparable<AssignableNode> {
_currentAssignedReplicaMap = new HashMap<>();
}
+ AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
String instanceName) {
+ this(clusterConfig, null, instanceConfig, instanceName);
+ }
+
/**
* This function should only be used to assign a set of new partitions that
are not allocated on
* this node. It's because the any exception could occur at the middle of
batch assignment and the
@@ -272,6 +282,10 @@ public class AssignableNode implements
Comparable<AssignableNode> {
return _instanceName;
}
+ public String getLogicalId() {
+ return _logicaId;
+ }
+
public Set<String> getInstanceTags() {
return _instanceTags;
}
@@ -368,7 +382,7 @@ public class AssignableNode implements
Comparable<AssignableNode> {
@Override
public int compareTo(AssignableNode o) {
- return _instanceName.compareTo(o.getInstanceName());
+ return _logicaId.compareTo(o.getLogicalId());
}
@Override
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 42eaabaf9..7ef503e01 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -37,6 +37,7 @@ public class ClusterModel {
// Note that the identical replicas are deduped in the index.
private final Map<String, Map<String, AssignableReplica>>
_assignableReplicaIndex;
private final Map<String, AssignableNode> _assignableNodeMap;
+ private final Set<String> _assignableNodeLogicalIds;
/**
* @param clusterContext The initialized cluster context.
@@ -60,6 +61,9 @@ public class ClusterModel {
_assignableNodeMap = assignableNodes.parallelStream()
.collect(Collectors.toMap(AssignableNode::getInstanceName, node ->
node));
+ _assignableNodeLogicalIds =
+ assignableNodes.parallelStream().map(AssignableNode::getLogicalId)
+ .collect(Collectors.toSet());
}
public ClusterContext getContext() {
@@ -70,6 +74,10 @@ public class ClusterModel {
return _assignableNodeMap;
}
+ public Set<String> getAssignableLogicalIds() {
+ return _assignableNodeLogicalIds;
+ }
+
public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
return _assignableReplicaMap;
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index dffaec3e0..3f1673210 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixException;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
@@ -530,9 +531,12 @@ public class ClusterModelProvider {
*/
private static Set<AssignableNode> getAllAssignableNodes(ClusterConfig
clusterConfig,
Map<String, InstanceConfig> instanceConfigMap, Set<String>
activeInstances) {
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
return activeInstances.parallelStream()
.filter(instanceConfigMap::containsKey).map(
- instanceName -> new AssignableNode(clusterConfig,
instanceConfigMap.get(instanceName),
+ instanceName -> new AssignableNode(clusterConfig,
clusterTopologyConfig,
+ instanceConfigMap.get(instanceName),
instanceName)).collect(Collectors.toSet());
}
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ebbcf64d0..34bd56487 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,10 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
@@ -47,7 +51,6 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
@@ -67,6 +70,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CurrentState;
@@ -86,6 +90,7 @@ import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.ConfigStringUtil;
@@ -114,6 +119,8 @@ public class ZKHelixAdmin implements HelixAdmin {
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
+ private static final ImmutableSet<String>
ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE =
+ ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name());
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
@@ -197,6 +204,108 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("Node " + nodeId + " already exists in cluster
" + clusterName);
}
+ if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
+ instanceConfig.getInstanceOperation())) {
+ throw new HelixException(
+ "Instance can only be added if InstanceOperation is set to one of" +
"the following: "
+ + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This
instance: " + nodeId
+ + " has InstanceOperation set to " +
instanceConfig.getInstanceOperation());
+ }
+
+ // Get the topology key used to determine the logicalId of a node.
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ ClusterTopologyConfig clusterTopologyConfig =
+ ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+ String logicalIdKey = clusterTopologyConfig.getEndNodeType();
+ String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
+ String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
+
+ HelixConfigScope instanceConfigScope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+ clusterName).build();
+ List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
+ List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
+ existingInstanceIds.parallelStream()
+ .map(existingInstanceId -> getInstanceConfig(clusterName,
existingInstanceId)).filter(
+ existingInstanceConfig ->
existingInstanceConfig.getLogicalId(logicalIdKey)
+
.equals(toAddInstanceLogicalId)).collect(Collectors.toList());
+
+ if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
+ // If the length is 2, we cannot add an instance with the same logicalId
as an existing instance
+ // regardless of InstanceOperation.
+ throw new HelixException(
+ "There can only be 2 instances with the same logicalId in a cluster.
"
+ + "Existing instances: " +
foundInstanceConfigsWithMatchingLogicalId.get(0)
+ .getInstanceName() + " and " +
foundInstanceConfigsWithMatchingLogicalId.get(1)
+ .getInstanceName() + " already have the same logicalId: " +
toAddInstanceLogicalId
+ + "; therefore, " + nodeId + " cannot be added to the cluster.");
+ } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
+ // If there is only one instance with the same logicalId, we can infer
that the intended behaviour
+ // is to SWAP_IN.
+
+ // If the InstanceOperation is unset, we will set it to SWAP_IN.
+ if (!instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
+ }
+
+ // If the existing instance with the same logicalId does not have
InstanceOperation set to SWAP_OUT and this instance
+ // is attempting to join as enabled, we cannot add this instance.
+ if (instanceConfig.getInstanceEnabled() &&
!foundInstanceConfigsWithMatchingLogicalId.get(0)
+ .getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+ throw new HelixException(
+ "Instance can only be added if the exising instance sharing the
same logicalId has InstanceOperation"
+ + " set to " +
InstanceConstants.InstanceOperation.SWAP_OUT.name()
+ + " and this instance has InstanceOperation set to "
+ + InstanceConstants.InstanceOperation.SWAP_IN.name() + ". " +
"Existing instance: "
+ +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+ + " has InstanceOperation: "
+ +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
+ + " and this instance: " + nodeId + " has InstanceOperation: "
+ + instanceConfig.getInstanceOperation());
+ }
+
+ // If the existing instance with the same logicalId is not in the same
FAULT_ZONE as this instance, we cannot
+ // add this instance.
+ if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
+ .containsKey(faultZoneKey) ||
!instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
+ ||
!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
+ .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
+ throw new HelixException(
+ "Instance can only be added if the SWAP_OUT instance sharing the
same logicalId is in the same FAULT_ZONE"
+ + " as this instance. " + "Existing instance: "
+ +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+ + " has FAULT_ZONE_TYPE: " +
foundInstanceConfigsWithMatchingLogicalId.get(0)
+ .getDomainAsMap().get(faultZoneKey) + " and this instance: " +
nodeId
+ + " has FAULT_ZONE_TYPE: " +
instanceConfig.getDomainAsMap().get(faultZoneKey));
+ }
+
+ Map<String, Integer> foundInstanceCapacityMap =
+
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
+ ? clusterConfig.getDefaultInstanceCapacityMap()
+ :
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
+ Map<String, Integer> instanceCapacityMap =
instanceConfig.getInstanceCapacityMap().isEmpty()
+ ? clusterConfig.getDefaultInstanceCapacityMap() :
instanceConfig.getInstanceCapacityMap();
+ // If the instance does not have the same capacity, we cannot add this
instance.
+ if (!new EqualsBuilder().append(foundInstanceCapacityMap,
instanceCapacityMap).isEquals()) {
+ throw new HelixException(
+ "Instance can only be added if the SWAP_OUT instance sharing the
same logicalId has the same capacity"
+ + " as this instance. " + "Existing instance: "
+ +
foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
+ + " has capacity: " + foundInstanceCapacityMap + " and this
instance: " + nodeId
+ + " has capacity: " + instanceCapacityMap);
+ }
+ } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
+ // If there are no instances with the same logicalId, we can only add
this instance if InstanceOperation
+ // is unset because it is a new instance.
+ throw new HelixException(
+ "There is no instance with logicalId: " + toAddInstanceLogicalId + "
in cluster: "
+ + clusterName + "; therefore, " + nodeId
+ + " cannot join cluster with InstanceOperation set to "
+ + instanceConfig.getInstanceOperation() + ".");
+ }
+
ZKUtil.createChildren(_zkClient, instanceConfigsPath,
instanceConfig.getRecord());
_zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName,
nodeId), true);
@@ -358,6 +467,21 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("{} instance {} in cluster {}.", enabled ? "Enable" :
"Disable", instanceName,
clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<>(_zkClient);
+
+ // If enabled is set to true and InstanceOperation is SWAP_IN, we should
fail if there is not a
+ // matching SWAP_OUT instance.
+ InstanceConfig instanceConfig = getInstanceConfig(clusterName,
instanceName);
+ if (enabled && instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ InstanceConfig matchingSwapInstance =
findMatchingSwapInstance(clusterName, instanceConfig);
+ if (matchingSwapInstance == null ||
!matchingSwapInstance.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+ throw new HelixException("Instance cannot be enabled if
InstanceOperation is set to "
+ + instanceConfig.getInstanceOperation() + " when there is no
matching "
+ + InstanceConstants.InstanceOperation.SWAP_OUT.name() + "
instance.");
+ }
+ }
+
// Eventually we will have all instances' enable/disable information in
clusterConfig. Now we
// update both instanceConfig and clusterConfig in transition period.
enableSingleInstance(clusterName, instanceName, enabled, baseAccessor,
disabledType, reason);
@@ -379,11 +503,53 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
// TODO: Name may change in future
public void setInstanceOperation(String clusterName, String instanceName,
- InstanceConstants.InstanceOperation instanceOperation) {
+ @Nullable InstanceConstants.InstanceOperation instanceOperation) {
BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<>(_zkClient);
String path = PropertyPathBuilder.instanceConfig(clusterName,
instanceName);
+ // InstanceOperation can only be set to SWAP_IN when the instance is added
to the cluster
+ // or if it is disabled.
+ if (instanceOperation != null && instanceOperation.equals(
+ InstanceConstants.InstanceOperation.SWAP_IN) &&
getInstanceConfig(clusterName,
+ instanceName).getInstanceEnabled()) {
+ throw new HelixException("InstanceOperation should only be set to "
+ + InstanceConstants.InstanceOperation.SWAP_IN.name()
+ + " when an instance joins the cluster for the first time(when "
+ + "creating the InstanceConfig) or is disabled.");
+ }
+
+ // InstanceOperation cannot be set to null if there is an instance with
the same logicalId in
+ // the cluster which does not have InstanceOperation set to SWAP_IN or
SWAP_OUT.
+ if (instanceOperation == null) {
+ InstanceConfig instanceConfig = getInstanceConfig(clusterName,
instanceName);
+ String logicalIdKey = ClusterTopologyConfig.createFromClusterConfig(
+ _configAccessor.getClusterConfig(clusterName)).getEndNodeType();
+
+ HelixConfigScope instanceConfigScope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+ clusterName).build();
+ List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
+ List<InstanceConfig> matchingInstancesWithNonSwappingInstanceOperation =
+ existingInstanceIds.parallelStream()
+ .map(existingInstanceId -> getInstanceConfig(clusterName,
existingInstanceId)).filter(
+ existingInstanceConfig ->
+
!existingInstanceConfig.getInstanceName().equals(instanceName)
+ && existingInstanceConfig.getLogicalId(logicalIdKey)
+ .equals(instanceConfig.getLogicalId(logicalIdKey))
+ && !existingInstanceConfig.getInstanceOperation()
+
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+ && !existingInstanceConfig.getInstanceOperation()
+
.equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+ .collect(Collectors.toList());
+
+ if (!matchingInstancesWithNonSwappingInstanceOperation.isEmpty()) {
+ throw new HelixException("InstanceOperation cannot be set to null for
" + instanceName
+ + " if there are other instances with the same logicalId in the
cluster that do not have"
+ + " InstanceOperation set to SWAP_IN or SWAP_OUT.");
+ }
+ }
+
if (!baseAccessor.exists(path, 0)) {
throw new HelixException(
"Cluster " + clusterName + ", instance: " + instanceName + ",
instance config does not exist");
@@ -410,16 +576,263 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
- if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) {
+ if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null &&
config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name());
}
return false;
}
+ /**
+ * Find the instance that the passed instance is swapping with. If the
passed instance has
+ * SWAP_OUT instanceOperation, then find the corresponding instance that has
SWAP_IN
+ * instanceOperation. If the passed instance has SWAP_IN instanceOperation,
then find the
+ * corresponding instance that has SWAP_OUT instanceOperation.
+ *
+ * @param clusterName The cluster name
+ * @param instanceConfig The instance to find the swap instance for
+ * @return The swap instance if found, null otherwise.
+ */
+ @Nullable
+ private InstanceConfig findMatchingSwapInstance(String clusterName,
+ InstanceConfig instanceConfig) {
+ String logicalIdKey =
+
ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
+ .getEndNodeType();
+
+ for (String potentialSwappingInstance : getConfigKeys(
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
+ clusterName).build())) {
+ InstanceConfig potentialSwappingInstanceConfig =
+ getInstanceConfig(clusterName, potentialSwappingInstance);
+
+ // Return if there is a matching Instance with the same logicalId and
opposite InstanceOperation swap operation.
+ if (potentialSwappingInstanceConfig.getLogicalId(logicalIdKey)
+ .equals(instanceConfig.getLogicalId(logicalIdKey)) && (
+ instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+ && potentialSwappingInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) ||
(
+ instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
+ && potentialSwappingInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()))) {
+ return potentialSwappingInstanceConfig;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Check to see if swapping between two instances is ready to be completed.
Checks: 1. Both
+ * instances must be alive. 2. Both instances must only have one session and
not be carrying over
+ * from a previous session. 3. Both instances must have no pending messages.
4. Both instances
+ * cannot have partitions in the ERROR state 5. SwapIn instance must have
correct state for all
+ * partitions that are currently assigned to the SwapOut instance.
+ * TODO: We may want to make this a public API in the future.
+ *
+ * @param clusterName The cluster name
+ * @param swapOutInstanceName The instance that is being swapped out
+ * @param swapInInstanceName The instance that is being swapped in
+ * @return True if the swap is ready to be completed, false otherwise.
+ */
+ private boolean canCompleteSwap(String clusterName, String
swapOutInstanceName,
+ String swapInInstanceName) {
+ BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // 1. Check that both instances are alive.
+ LiveInstance swapOutLiveInstance =
+ accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
+ LiveInstance swapInLiveInstance =
+ accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
+ if (swapOutLiveInstance == null || swapInLiveInstance == null) {
+ logger.warn(
+ "SwapOutInstance {} is {} and SwapInInstance {} is {} for cluster
{}. Swap will not complete unless both instances are ONLINE.",
+ swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" :
"OFFLINE",
+ swapInInstanceName, swapInLiveInstance != null ? "ONLINE" :
"OFFLINE", clusterName);
+ return false;
+ }
+
+ // 2. Check that both instances only have one session and are not carrying
any over.
+ // count number of sessions under CurrentState folder. If it is carrying
over from prv session,
+ // then there are > 1 session ZNodes.
+ List<String> swapOutSessions = baseAccessor.getChildNames(
+ PropertyPathBuilder.instanceCurrentState(clusterName,
swapOutInstanceName), 0);
+ List<String> swapInSessions = baseAccessor.getChildNames(
+ PropertyPathBuilder.instanceCurrentState(clusterName,
swapInInstanceName), 0);
+ if (swapOutSessions.size() > 1 || swapInSessions.size() > 1) {
+ logger.warn(
+ "SwapOutInstance {} is carrying over from prev session and
SwapInInstance {} is carrying over from prev session for cluster {}."
+ + " Swap will not complete unless both instances have only one
session.",
+ swapOutInstanceName, swapInInstanceName, clusterName);
+ return false;
+ }
+
+ // 3. Check that the swapOutInstance has no pending messages.
+ List<Message> swapOutMessages =
+ accessor.getChildValues(keyBuilder.messages(swapOutInstanceName),
true);
+ int swapOutPendingMessageCount = swapOutMessages != null ?
swapOutMessages.size() : 0;
+ List<Message> swapInMessages =
+ accessor.getChildValues(keyBuilder.messages(swapInInstanceName), true);
+ int swapInPendingMessageCount = swapInMessages != null ?
swapInMessages.size() : 0;
+ if (swapOutPendingMessageCount > 0 || swapInPendingMessageCount > 0) {
+ logger.warn(
+ "SwapOutInstance {} has {} pending messages and SwapInInstance {}
has {} pending messages for cluster {}."
+ + " Swap will not complete unless both instances have no pending
messages.",
+ swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName,
+ swapInPendingMessageCount, clusterName);
+ return false;
+ }
+
+ // 4. Collect a list of all partitions that have a current state on
swapOutInstance
+ String swapOutActiveSession = swapOutLiveInstance.getEphemeralOwner();
+ String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();
+
+ // Iterate over all resources with current states on the swapOutInstance
+ List<String> swapOutResources = baseAccessor.getChildNames(
+ PropertyPathBuilder.instanceCurrentState(clusterName,
swapOutInstanceName,
+ swapOutActiveSession), 0);
+ for (String swapOutResource : swapOutResources) {
+ // Get the topState and secondTopStates for the stateModelDef used by
the resource.
+ IdealState idealState =
accessor.getProperty(keyBuilder.idealStates(swapOutResource));
+ StateModelDefinition stateModelDefinition =
+
accessor.getProperty(keyBuilder.stateModelDef(idealState.getStateModelDefRef()));
+ String topState = stateModelDefinition.getTopState();
+ Set<String> secondTopStates = stateModelDefinition.getSecondTopStates();
+
+ CurrentState swapOutResourceCurrentState = accessor.getProperty(
+ keyBuilder.currentState(swapOutInstanceName, swapOutActiveSession,
swapOutResource));
+ CurrentState swapInResourceCurrentState = accessor.getProperty(
+ keyBuilder.currentState(swapInInstanceName, swapInActiveSession,
swapOutResource));
+
+ // Check to make sure swapInInstance has a current state for the resource
+ if (swapInResourceCurrentState == null) {
+ logger.warn(
+ "SwapOutInstance {} has current state for resource {} but
SwapInInstance {} does not for cluster {}."
+ + " Swap will not complete unless both instances have current
states for all resources.",
+ swapOutInstanceName, swapOutResource, swapInInstanceName,
clusterName);
+ return false;
+ }
+
+ // Iterate over all partitions in the swapOutInstance's current state
for the resource
+ // and ensure that the swapInInstance has the correct state for the
partition.
+ for (String partitionName :
swapOutResourceCurrentState.getPartitionStateMap().keySet()) {
+ String swapOutPartitionState =
swapOutResourceCurrentState.getState(partitionName);
+ String swapInPartitionState =
swapInResourceCurrentState.getState(partitionName);
+
+ // Neither instance should have any partitions in ERROR state.
+ if (swapOutPartitionState.equals(HelixDefinedState.ERROR.name())
+ || swapInPartitionState.equals(HelixDefinedState.ERROR.name())) {
+ logger.warn(
+ "SwapOutInstance {} has partition {} in state {} and
SwapInInstance {} has partition {} in state {} for cluster {}."
+ + " Swap will not complete unless both instances have no
partitions in ERROR state.",
+ swapOutInstanceName, partitionName, swapOutPartitionState,
swapInInstanceName,
+ partitionName, swapInPartitionState, clusterName);
+ return false;
+ }
+
+ // When the state of a partition on a swapOut instance is in the
topState, the state
+ // of the partition on the swapInInstance should also be in the
topState or a secondTopState.
+ // It should be in a topState only if the state model allows multiple
replicas in the topState.
+ // In all other cases it should be a secondTopState.
+ if (swapOutPartitionState.equals(topState) &&
!(swapInPartitionState.equals(topState)
+ || secondTopStates.contains(swapInPartitionState))) {
+ logger.warn(
+ "SwapOutInstance {} has partition {} in topState {} but
SwapInInstance {} has partition {} in state {} for cluster {}."
+ + " Swap will not complete unless SwapInInstance has
partition in topState or secondState.",
+ swapOutInstanceName, partitionName, swapOutPartitionState,
swapInInstanceName,
+ partitionName, swapInPartitionState, clusterName);
+ return false;
+ }
+
+ // When the state of a partition on a swapOut instance is any other
state, except ERROR, DROPPED or TopState,
+ // the state of the partition on the swapInInstance should be the same.
+ if (!swapOutPartitionState.equals(topState) &&
!swapOutPartitionState.equals(
+ HelixDefinedState.DROPPED.name())
+ && !swapOutPartitionState.equals(swapInPartitionState)) {
+ logger.warn(
+ "SwapOutInstance {} has partition {} in state {} but
SwapInInstance {} has partition {} in state {} for cluster {}."
+ + " Swap will not complete unless both instances have
matching states.",
+ swapOutInstanceName, partitionName, swapOutPartitionState,
swapInInstanceName,
+ partitionName, swapInPartitionState, clusterName);
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean canCompleteSwap(String clusterName, String instanceName) {
+ InstanceConfig instanceConfig = getInstanceConfig(clusterName,
instanceName);
+ if (instanceConfig == null) {
+ logger.warn(
+ "Instance {} in cluster {} does not exist. Cannot determine if the
swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ InstanceConfig swapOutInstanceConfig =
instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ?
instanceConfig
+ : findMatchingSwapInstance(clusterName, instanceConfig);
+ InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ?
instanceConfig
+ : findMatchingSwapInstance(clusterName, instanceConfig);
+ if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
+ logger.warn(
+ "Instance {} in cluster {} is not swapping with any other instance.
Cannot determine if the swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ // Check if the swap is ready to be completed.
+ return canCompleteSwap(clusterName,
swapOutInstanceConfig.getInstanceName(),
+ swapInInstanceConfig.getInstanceName());
+ }
+
+ @Override
+ public boolean completeSwapIfPossible(String clusterName, String
instanceName) {
+ InstanceConfig instanceConfig = getInstanceConfig(clusterName,
instanceName);
+ if (instanceConfig == null) {
+ logger.warn(
+ "Instance {} in cluster {} does not exist. Cannot determine if the
swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ InstanceConfig swapOutInstanceConfig =
instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ?
instanceConfig
+ : findMatchingSwapInstance(clusterName, instanceConfig);
+ InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ?
instanceConfig
+ : findMatchingSwapInstance(clusterName, instanceConfig);
+ if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
+ logger.warn(
+ "Instance {} in cluster {} is not swapping with any other instance.
Cannot determine if the swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ // Check if the swap is ready to be completed. If not, return false.
+ if (!canCompleteSwap(clusterName, swapOutInstanceConfig.getInstanceName(),
+ swapInInstanceConfig.getInstanceName())) {
+ return false;
+ }
+
+ // Complete the swap by removing the InstanceOperation for the SWAP_IN
node and disabling the SWAP_OUT node.
+ setInstanceOperation(clusterName, swapInInstanceConfig.getInstanceName(),
null);
+ enableInstance(clusterName, swapOutInstanceConfig.getInstanceName(),
false);
+
+ return true;
+ }
+
@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String
instanceName) {
- if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) {
+ if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null &&
DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
config.getInstanceOperation());
@@ -434,7 +847,7 @@ public class ZKHelixAdmin implements HelixAdmin {
* @param instanceName
* @return
*/
- private boolean instanceHasCurrentSateOrMessage(String clusterName, String
instanceName) {
+ private boolean instanceHasCurrentStateOrMessage(String clusterName, String
instanceName) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
diff --git
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index b68250dd3..98da7340f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -702,6 +702,18 @@ public class InstanceConfig extends HelixProperty {
return _record.getId();
}
+ /**
+ * Get the logicalId of this instance. If it does not exist or is not set,
+ * return the instance name.
+ * @param logicalIdKey the key for the DOMAIN field containing the logicalId
+ * @return the logicalId of this instance
+ */
+ public String getLogicalId(String logicalIdKey) {
+ // TODO: Consider caching DomainMap, parsing the DOMAIN string every time
+ // getLogicalId is called can become expensive if called too frequently.
+ return getDomainAsMap().getOrDefault(logicalIdKey, getInstanceName());
+ }
+
@Override
public boolean isValid() {
// HELIX-65: remove check for hostname/port existence
@@ -772,6 +784,7 @@ public class InstanceConfig extends HelixProperty {
private int _weight = WEIGHT_NOT_SET;
private List<String> _tags = new ArrayList<>();
private boolean _instanceEnabled = HELIX_ENABLED_DEFAULT_VALUE;
+ private InstanceConstants.InstanceOperation _instanceOperation;
private Map<String, String> _instanceInfoMap;
private Map<String, Integer> _instanceCapacityMap;
@@ -819,6 +832,10 @@ public class InstanceConfig extends HelixProperty {
instanceConfig.setInstanceEnabled(_instanceEnabled);
}
+ if (_instanceOperation != null) {
+ instanceConfig.setInstanceOperation(_instanceOperation);
+ }
+
if (_instanceInfoMap != null) {
instanceConfig.setInstanceInfoMap(_instanceInfoMap);
}
@@ -890,6 +907,17 @@ public class InstanceConfig extends HelixProperty {
return this;
}
+ /**
+ * Set the instance operation for this instance
+ *
+ * @param instanceOperation the instance operation.
+ * @return InstanceConfig.Builder
+ */
+ public Builder setInstanceOperation(InstanceConstants.InstanceOperation
instanceOperation) {
+ _instanceOperation = instanceOperation;
+ return this;
+ }
+
/**
* Set the INSTANCE_INFO_MAP for this instance
* @param instanceInfoMap the instance info map
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 608a4d3af..000978ef1 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.rebalancer.waged;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -738,7 +739,8 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
instances.add(offlineInstance);
when(clusterData.getAllInstances()).thenReturn(instances);
when(clusterData.getEnabledInstances()).thenReturn(instances);
-
when(clusterData.getEnabledLiveInstances()).thenReturn(ImmutableSet.of(instance0,
instance1, instance2));
+ when(clusterData.getEnabledLiveInstances()).thenReturn(
+ new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() +
Integer.MAX_VALUE);
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 10cd662cb..9ccc14fdf 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -10,12 +10,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
@@ -30,8 +32,10 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -51,15 +55,28 @@ public class TestInstanceOperation extends ZkTestBase {
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ private final String TEST_CAPACITY_KEY = "TestCapacityKey";
+ private final int TEST_CAPACITY_VALUE = 100;
+ protected static final String ZONE = "zone";
+ protected static final String HOST = "host";
+ protected static final String LOGICAL_ID = "logicalId";
+ protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE,
HOST, LOGICAL_ID);
+
+ protected static final ImmutableSet<String> SECONDARY_STATE_SET =
+ ImmutableSet.of("SLAVE", "STANDBY");
+ protected static final ImmutableSet<String> ACCEPTABLE_STATE_SET =
+ ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY");
private int REPLICA = 3;
protected ClusterControllerManager _controller;
List<MockParticipantManager> _participants = new ArrayList<>();
+ private List<String> _originalParticipantNames = new ArrayList<>();
List<String> _participantNames = new ArrayList<>();
private Set<String> _allDBs = new HashSet<>();
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;
+ private final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L;
private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;
@@ -72,6 +89,7 @@ public class TestInstanceOperation extends ZkTestBase {
for (int i = 0; i < NUM_NODE; i++) {
String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _originalParticipantNames.add(participantName);
addParticipant(participantName);
}
@@ -88,24 +106,88 @@ public class TestInstanceOperation extends ZkTestBase {
_configAccessor = new ConfigAccessor(_gZkClient);
_dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ setupClusterConfig();
+
+ createTestDBs(DEFAULT_RESOURCE_DELAY_TIME);
+
+ setUpWagedBaseline();
+
+ _admin = new ZKHelixAdmin(_gZkClient);
+ }
+
+ private void setupClusterConfig() {
+ _stateModelDelay = 3L;
ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.stateTransitionCancelEnabled(true);
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(1800000L);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
- createTestDBs(1800000L);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
- setUpWagedBaseline();
+ private void enabledTopologyAwareRebalance() {
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setTopology(TOPOLOGY);
+ clusterConfig.setFaultZoneType(ZONE);
+ clusterConfig.setTopologyAwareEnabled(true);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
- _admin = new ZKHelixAdmin(_gZkClient);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ private void disableTopologyAwareRebalance() {
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setTopologyAwareEnabled(false);
+ clusterConfig.setTopology(null);
+ clusterConfig.setFaultZoneType(null);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ private void resetInstances() {
+ // Disable and drop any participants that are not in the original
participant list.
+ Set<String> droppedParticipants = new HashSet<>();
+ for (int i = 0; i < _participants.size(); i++) {
+ String participantName = _participantNames.get(i);
+ if (!_originalParticipantNames.contains(participantName)) {
+ _participants.get(i).syncStop();
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
participantName, false);
+ _gSetupTool.getClusterManagementTool()
+ .dropInstance(CLUSTER_NAME,
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
participantName));
+ droppedParticipants.add(participantName);
+ }
+ }
+
+ // Remove the dropped instance from _participants and _participantNames
+ _participantNames.removeIf(droppedParticipants::contains);
+ _participants.removeIf(p ->
droppedParticipants.contains(p.getInstanceName()));
+
+ for (int i = 0; i < _participants.size(); i++) {
+ // If instance is not connected to ZK, replace it
+ if (!_participants.get(i).isConnected()) {
+ // Drop bad instance from the cluster.
+ _gSetupTool.getClusterManagementTool()
+ .dropInstance(CLUSTER_NAME,
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
_participantNames.get(i)));
+ _participants.set(i, createParticipant(_participantNames.get(i),
Integer.toString(i),
+ "zone_" + i, null, true, -1));
+ _participants.get(i).syncStart();
+ continue;
+ }
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
_participantNames.get(i), true);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
@Test
public void testEvacuate() throws Exception {
System.out.println("START TestInstanceOperation.testEvacuate() at " + new
Date(System.currentTimeMillis()));
// EV should contain all participants, check resources one by one
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
}
@@ -118,7 +200,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// New ev should contain all instances but the evacuated one
- assignment = getEV();
+ assignment = getEVs();
List<String> currentActiveInstances =
_participantNames.stream().filter(n ->
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
for (String resource : _allDBs) {
@@ -143,7 +225,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// EV should contain all participants, check resources one by one
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
validateAssignmentInEv(assignment.get(resource));
@@ -159,7 +241,7 @@ public class TestInstanceOperation extends ZkTestBase {
.enableInstance(CLUSTER_NAME, mockNewInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
//ev should contain all instances but the disabled one
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
List<String> currentActiveInstances =
_participantNames.stream().filter(n ->
!n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
@@ -175,7 +257,7 @@ public class TestInstanceOperation extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, true);
//ev should be the same
- assignment = getEV();
+ assignment = getEVs();
currentActiveInstances =
_participantNames.stream().filter(n ->
!n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
@@ -193,7 +275,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// EV should contain all participants, check resources one by one
- assignment = getEV();
+ assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
validateAssignmentInEv(assignment.get(resource));
@@ -234,7 +316,7 @@ public class TestInstanceOperation extends ZkTestBase {
// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
// before we cancel, check current EV
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
// check every replica has >= 3 partitions and a top state partition
validateAssignmentInEv(assignment.get(resource));
@@ -244,7 +326,7 @@ public class TestInstanceOperation extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
- assignment = getEV();
+ assignment = getEVs();
for (String resource : _allDBs) {
// check every replica has >= 3 active replicas, even before cluster
converge
validateAssignmentInEv(assignment.get(resource));
@@ -254,7 +336,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// EV should contain all participants, check resources one by one
- assignment = getEV();
+ assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
// check every replica has >= 3 active replicas again
@@ -283,7 +365,7 @@ public class TestInstanceOperation extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
// check every replica has >= 3 active replicas, even before cluster
converge
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource));
}
@@ -291,7 +373,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// EV should contain all participants, check resources one by one
- assignment = getEV();
+ assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
// check every replica has >= 3 active replicas
@@ -309,7 +391,7 @@ public class TestInstanceOperation extends ZkTestBase {
addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- Map<String, ExternalView> assignment = getEV();
+ Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE)));
}
@@ -332,7 +414,7 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- assignment = getEV();
+ assignment = getEVs();
List<String> currentActiveInstances =
_participantNames.stream().filter(n ->
!n.equals(instanceToEvacuate)).collect(Collectors.toList());
for (String resource : _allDBs) {
@@ -342,6 +424,8 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}
Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME,
instanceToEvacuate));
+
+ _stateModelDelay = 3L;
}
@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
@@ -356,7 +440,7 @@ public class TestInstanceOperation extends ZkTestBase {
Map<String, ExternalView> assignment;
// EV should contain all participants, check resources one by one
- assignment = getEV();
+ assignment = getEVs();
for (String resource : _allDBs) {
TestHelper.verify(() -> {
ExternalView ev = assignment.get(resource);
@@ -379,13 +463,686 @@ public class TestInstanceOperation extends ZkTestBase {
}, 30000);
}
- _participants.get(1).syncStart();
- _participants.get(2).syncStart();
+ resetInstances();
+ dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED",
"TEST_DB4_DELAYED_WAGED"));
}
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testEvacuationWithOfflineInstancesInCluster")
+ public void testAddingNodeWithSwapOutInstanceOperation() throws Exception {
+ System.out.println(
+ "START
TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new
Date(
+ System.currentTimeMillis()));
+
+ enabledTopologyAwareRebalance();
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_OUT, true, -1);
+ }
- private void addParticipant(String participantName) {
- _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testAddingNodeWithSwapOutInstanceOperation")
+ public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws
Exception {
+ System.out.println(
+ "START
TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at "
+ + new Date(System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Set instance's InstanceOperation to null
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testAddingNodeWithSwapOutNodeInstanceOperationUnset")
+ public void testNodeSwapWithNoSwapOutNode() throws Exception {
+ System.out.println("START
TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date(
+ System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Add new instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName, "1000", "zone_1000",
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
+ public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws
Exception {
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at "
+ + new Date(System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with same logicalId with InstanceOperation unset
+ // This should work because adding instance with InstanceOperation unset
will automatically
+ // set the InstanceOperation to SWAP_IN.
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapSwapInNodeNoInstanceOperationEnabled")
+ public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception
{
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at "
+ + new Date(System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ // Add another instance with InstanceOperation set to SWAP_IN with same
logicalId as previously
+ // added SWAP_IN instance.
+ String secondInstanceToSwapInName =
+ PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size());
+ addParticipant(secondInstanceToSwapInName,
+ instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapSwapInNodeWithAlreadySwappingPair")
+ public void testNodeSwapNoTopologySetup() throws Exception {
+ System.out.println("START
TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
+ System.currentTimeMillis()));
+ disableTopologyAwareRebalance();
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ // There should be an error that the logicalId does not have SWAP_OUT
instance because,
+ // helix can't determine what topology key to use to get the logicalId if
TOPOLOGY is not set.
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapNoTopologySetup")
+ public void testNodeSwapWrongFaultZone() throws Exception {
+ System.out.println("START
TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date(
+ System.currentTimeMillis()));
+ // Re-enable topology aware rebalancing and set TOPOLOGY.
+ enabledTopologyAwareRebalance();
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ // There should be an error because SWAP_IN instance must be in the same
FAULT_ZONE as the SWAP_OUT instance.
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE) + "1",
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapWrongFaultZone")
+ public void testNodeSwapWrongCapacity() throws Exception {
+ System.out.println("START
TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date(
+ System.currentTimeMillis()));
+ resetInstances();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ // There should be an error because SWAP_IN instance must have same
capacity as the SWAP_OUT node.
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, TEST_CAPACITY_VALUE
- 10);
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapWrongCapacity")
+ public void testNodeSwap() throws Exception {
+ System.out.println(
+ "START TestInstanceOperation.testNodeSwap() at " + new
Date(System.currentTimeMillis()));
+ resetInstances();
+
+ // Store original EV
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
+ // and adding the SWAP_IN instance to the cluster.
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).size(),
+ 0);
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
+ // swap was completed.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Set.of(instanceToSwapInName));
+ }
+
+ @Test(dependsOnMethods = "testNodeSwap")
+ public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws
Exception {
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
+ + new Date(System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Store original EVs
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Add instance with InstanceOperation unset, should automatically be set
to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
false, -1);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Enable the SWAP_IN instance, so it can start being assigned replicas
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instanceToSwapInName, true);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
+ // and adding the SWAP_IN instance to the cluster.
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).size(),
+ 0);
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
+ // swap was completed.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Set.of(instanceToSwapInName));
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled")
+ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception {
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new
Date(
+ System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Store original EVs
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
+ // and adding the SWAP_IN instance to the cluster.
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+ // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT
InstanceOperation from SWAP_OUT instance.
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+
+ // Wait for cluster to converge.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Validate there are no partitions on the SWAP_IN instance.
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapInName).size(), 0);
+
+ // Validate that the SWAP_OUT instance has the same partitions as it had
before.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Validate there are no partitions on the SWAP_IN instance.
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapInName).size(), 0);
+
+ // Validate that the SWAP_OUT instance has the same partitions as it had
before.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete")
+ public void testNodeSwapAfterEMM() throws Exception {
+ System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at
" + new Date(
+ System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Store original EVs
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Put the cluster in maintenance mode.
+ _gSetupTool.getClusterManagementTool()
+ .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ // Validate that the assignment has not changed since adding the SWAP_IN
node.
+ // During MM, the cluster should not compute new assignment.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // Remove the cluster from maintenance mode.
+ // Now swapping will begin
+ _gSetupTool.getClusterManagementTool()
+ .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
exiting MM
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).size(),
+ 0);
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
+ // swap was completed.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Set.of(instanceToSwapInName));
+ }
+
+ @Test(dependsOnMethods = "testNodeSwapAfterEMM")
+ public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception {
+ System.out.println(
+ "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled()
at " + new Date(
+ System.currentTimeMillis()));
+
+ resetInstances();
+
+ // Store original EVs
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ // Set instance's InstanceOperation to SWAP_OUT
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ Set<String> swapOutInstanceOriginalPartitions =
+ getPartitionsAndStatesOnInstance(originalEVs,
instanceToSwapOutName).keySet();
+
+ // Disable the SWAP_OUT instance.
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Validate that the SWAP_OUT instance has all partitions in OFFLINE state
+ Set<String> swapOutInstanceOfflineStates =
+ new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).values());
+ Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1);
+ Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE"));
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Validate that the SWAP_IN instance has the same partitions as the
SWAP_OUT instance in second top state.
+ Map<String, String> swapInInstancePartitionsAndStates =
+ getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
+ Assert.assertTrue(
+
swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
+ Set<String> swapInInstanceStates = new
HashSet<>(swapInInstancePartitionsAndStates.values());
+ swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
+ Assert.assertEquals(swapInInstanceStates.size(), 0);
+
+ // Assert canSwapBeCompleted is false because SWAP_OUT instance is
disabled.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+ // Enable the SWAP_OUT instance.
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance originally
+ // had. Validate they are in second top state because initially disabling
SWAP_OUT instance
+ // caused all topStates to be handed off to next replica in the preference
list.
+ swapInInstancePartitionsAndStates =
+ getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
+ Assert.assertTrue(
+
swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
+ swapInInstanceStates = new
HashSet<>(swapInInstancePartitionsAndStates.values());
+ swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
+ Assert.assertEquals(swapInInstanceStates.size(), 0);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).size(),
+ 0);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapWithSwapOutInstanceDisabled")
+ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+ + new Date(System.currentTimeMillis()));
+ resetInstances();
+
+ // Get the SWAP_OUT instance.
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+ // Add instance with InstanceOperation set to SWAP_IN enabled before
setting SWAP_OUT instance.
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
true, -1);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
+ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
+ System.out.println(
+ "START
TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
+ + new Date(System.currentTimeMillis()));
+ resetInstances();
+
+ // Get the SWAP_OUT instance.
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
false, -1);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instanceToSwapInName, true);
+ }
+
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
+ public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() {
+ System.out.println(
+ "START
TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
at "
+ + new Date(System.currentTimeMillis()));
+ resetInstances();
+
+ // Get the SWAP_OUT instance.
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
false, -1);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Try to remove the InstanceOperation from the SWAP_IN instance before
the SWAP_OUT instance is set.
+ // This should throw exception because we cannot ever have two instances
with the same logicalId and both have InstanceOperation
+ // unset.
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
+ }
+
+ @Test(dependsOnMethods =
"testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut")
+ public void testNodeSwapAddSwapInFirst() {
+ System.out.println("START
TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date(
+ System.currentTimeMillis()));
+ resetInstances();
+
+ // Store original EV
+ Map<String, ExternalView> originalEVs = getEVs();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ // Get the SWAP_OUT instance.
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig =
_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+ // Add instance with InstanceOperation set to SWAP_IN
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT +
_participants.size());
+ swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
+ addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null,
false, -1);
+
+ // Validate that the assignment has not changed since setting the
InstanceOperation to SWAP_OUT
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
+ // After the SWAP_IN instance is added, we set the InstanceOperation to
SWAP_OUT
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME,
instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.SWAP_OUT);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Enable the SWAP_IN instance to begin the swap operation.
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
instanceToSwapInName, true);
+
+ // Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
+ // and adding the SWAP_IN instance to the cluster.
+ // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT
instance
+ // but none of them are in a top state.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Set.of(instanceToSwapInName), Collections.emptySet());
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+ // Assert completeSwapIfPossible is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that SWAP_OUT instance is disabled and has no partitions
assigned to it.
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(),
instanceToSwapOutName).size(),
+ 0);
+
+ // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
+ // swap was completed.
+ validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Set.of(instanceToSwapInName));
+ }
+
+ private MockParticipantManager createParticipant(String participantName,
String logicalId, String zone,
+ InstanceConstants.InstanceOperation instanceOperation, boolean enabled,
int capacity) {
+ InstanceConfig config = new InstanceConfig.Builder().setDomain(
+ String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST,
participantName, LOGICAL_ID,
+
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+ .build(participantName);
+ if (capacity >= 0) {
+ config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
+ }
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
// start dummy participants
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, participantName);
@@ -393,12 +1150,24 @@ public class TestInstanceOperation extends ZkTestBase {
// Using a delayed state model
StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
stateMachine.registerStateModelFactory("MasterSlave", delayFactory);
+ return participant;
+ }
+
+ private void addParticipant(String participantName, String logicalId, String
zone,
+ InstanceConstants.InstanceOperation instanceOperation, boolean enabled,
int capacity) {
+ MockParticipantManager participant = createParticipant(participantName,
logicalId, zone,
+ instanceOperation, enabled, capacity);
participant.syncStart();
_participants.add(participant);
_participantNames.add(participantName);
}
+ private void addParticipant(String participantName) {
+ addParticipant(participantName, Integer.toString(_participants.size()),
+ "zone_" + _participants.size(), null, true, -1);
+ }
+
private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS,
REPLICA, REPLICA - 1, -1,
@@ -415,7 +1184,15 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
- private Map<String, ExternalView> getEV() {
+ private void dropTestDBs(Set<String> dbs) {
+ for (String db : dbs) {
+ _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db);
+ _allDBs.remove(db);
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ private Map<String, ExternalView> getEVs() {
Map<String, ExternalView> externalViews = new HashMap<String,
ExternalView>();
for (String db : _allDBs) {
ExternalView ev =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
@@ -450,7 +1227,65 @@ public class TestInstanceOperation extends ZkTestBase {
return assignedParticipants;
}
- // verify that each partition has >=REPLICA (3 in this case) replicas
+ private Map<String, String> getPartitionsAndStatesOnInstance(Map<String,
ExternalView> evs,
+ String instanceName) {
+ Map<String, String> instancePartitions = new HashMap<>();
+ for (String resourceEV : evs.keySet()) {
+ for (String partition : evs.get(resourceEV).getPartitionSet()) {
+ if
(evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) {
+ instancePartitions.put(partition,
+ evs.get(resourceEV).getStateMap(partition).get(instanceName));
+ }
+ }
+ }
+
+ return instancePartitions;
+ }
+
+ private void validateEVCorrect(ExternalView actual, ExternalView original,
+ Map<String, String> swapOutInstancesToSwapInInstances, Set<String>
inFlightSwapInInstances,
+ Set<String> completedSwapInInstanceNames) {
+ Assert.assertEquals(actual.getPartitionSet(), original.getPartitionSet());
+ IdealState is = _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, original.getResourceName());
+ StateModelDefinition stateModelDef = _gSetupTool.getClusterManagementTool()
+ .getStateModelDef(CLUSTER_NAME, is.getStateModelDefRef());
+ for (String partition : actual.getPartitionSet()) {
+ Map<String, String> expectedStateMap = new
HashMap<>(original.getStateMap(partition));
+ for (String swapOutInstance :
swapOutInstancesToSwapInInstances.keySet()) {
+ if (expectedStateMap.containsKey(swapOutInstance) &&
inFlightSwapInInstances.contains(
+ swapOutInstancesToSwapInInstances.get(swapOutInstance))) {
+ // If the corresponding swapInInstance is in-flight, add it to the
expectedStateMap
+ // with the same state as the swapOutInstance or secondState if the
swapOutInstance
+ // has a topState.
+
expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance),
+
expectedStateMap.get(swapOutInstance).equals(stateModelDef.getTopState())
+ ? (String) stateModelDef.getSecondTopStates().toArray()[0]
+ : expectedStateMap.get(swapOutInstance));
+ } else if (expectedStateMap.containsKey(swapOutInstance)
+ && completedSwapInInstanceNames.contains(
+ swapOutInstancesToSwapInInstances.get(swapOutInstance))) {
+ // If the corresponding swapInInstance is completed, add it to the
expectedStateMap
+ // with the same state as the swapOutInstance.
+
expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance),
+ expectedStateMap.get(swapOutInstance));
+ expectedStateMap.remove(swapOutInstance);
+ }
+ }
+ Assert.assertEquals(actual.getStateMap(partition), expectedStateMap,
"Error for partition " + partition
+ + " in resource " + actual.getResourceName());
+ }
+ }
+
+ private void validateEVsCorrect(Map<String, ExternalView> actuals,
+ Map<String, ExternalView> originals, Map<String, String>
swapOutInstancesToSwapInInstances,
+ Set<String> inFlightSwapInInstances, Set<String>
completedSwapInInstanceNames) {
+ Assert.assertEquals(actuals.keySet(), originals.keySet());
+ for (String resource : actuals.keySet()) {
+ validateEVCorrect(actuals.get(resource), originals.get(resource),
+ swapOutInstancesToSwapInInstances, inFlightSwapInInstances,
completedSwapInInstanceNames);
+ }
+ }
private void validateAssignmentInEv(ExternalView ev) {
validateAssignmentInEv(ev, REPLICA);
@@ -460,10 +1295,7 @@ public class TestInstanceOperation extends ZkTestBase {
Set<String> partitionSet = ev.getPartitionSet();
for (String partition : partitionSet) {
AtomicInteger activeReplicaCount = new AtomicInteger();
- ev.getStateMap(partition)
- .values()
- .stream()
- .filter(v -> v.equals("MASTER") || v.equals("LEADER") ||
v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
+
ev.getStateMap(partition).values().stream().filter(ACCEPTABLE_STATE_SET::contains)
.forEach(v -> activeReplicaCount.getAndIncrement());
Assert.assertTrue(activeReplicaCount.get() >=expectedNumber);
}
@@ -486,10 +1318,10 @@ public class TestInstanceOperation extends ZkTestBase {
// Set test instance capacity and partition weights
ClusterConfig clusterConfig =
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
- String testCapacityKey = "TestCapacityKey";
-
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
-
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
100));
-
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
1));
+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY));
+ clusterConfig.setDefaultInstanceCapacityMap(
+ Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE));
+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY,
1));
_dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(),
clusterConfig);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index e1ffbb646..59decd98e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -898,24 +898,28 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
instanceConfig.setHostName(hostname);
instanceConfig.setPort(port);
- if (i == 40) {
- instanceConfig.setDomain(String
- .format("invaliddomain=%s,zone=%s,rack=%s,host=%s", "mygroup" + i
% 2, "myzone" + i % 4,
- "myrack" + i % 4, hostname));
- } else if (i == 41) {
- instanceConfig.setDomain("invaliddomain");
- } else {
- String domain = String
- .format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2,
"myzone" + i % 4,
- "myrack" + i % 4, hostname);
- instanceConfig.setDomain(domain);
- }
+
+ String domain =
+ String.format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2,
"myzone" + i % 4,
+ "myrack" + i % 4, hostname);
+ instanceConfig.setDomain(domain);
+
LiveInstance liveInstance = new LiveInstance(instanceName);
liveInstance.setSessionId(UUID.randomUUID().toString());
liveInstance.setHelixVersion(UUID.randomUUID().toString());
accessor.setProperty(keyBuilder.liveInstance(instanceName),
liveInstance);
admin.addInstance(clusterName, instanceConfig);
admin.enableInstance(clusterName, instanceName, true);
+
+ if (i == 40) {
+ instanceConfig.setDomain(
+ String.format("invaliddomain=%s,zone=%s,rack=%s,host=%s",
"mygroup" + i % 2,
+ "myzone" + i % 4, "myrack" + i % 4, hostname));
+ admin.setInstanceConfig(clusterName, instanceName, instanceConfig);
+ } else if (i == 41) {
+ instanceConfig.setDomain("invaliddomain");
+ admin.setInstanceConfig(clusterName, instanceName, instanceConfig);
+ }
}
ClusterTopology clusterTopology = admin.getClusterTopology(clusterName);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 512a7b4db..aa93bc9c8 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -556,6 +556,16 @@ public class MockHelixAdmin implements HelixAdmin {
return false;
}
+ @Override
+ public boolean canCompleteSwap(String clusterName, String instancesNames) {
+ return false;
+ }
+
+ @Override
+ public boolean completeSwapIfPossible(String clusterName, String
instanceName) {
+ return false;
+ }
+
@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String
instancesNames) {
return false;
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 48b467eaa..64fbaff41 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -86,6 +86,8 @@ public class AbstractResource {
getInstance,
getAllInstances,
setInstanceOperation, // TODO: Name is just a place holder, may change in
future
+ canCompleteSwap,
+ completeSwapIfPossible,
onDemandRebalance
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index efc3ce652..b920f66ce 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -427,52 +427,63 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
}
admin.resetPartition(clusterId, instanceName,
node.get(PerInstanceProperties.resource.name()).textValue(),
- (List<String>) OBJECT_MAPPER
-
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
- OBJECT_MAPPER.getTypeFactory()
- .constructCollectionType(List.class, String.class)));
- break;
- case setInstanceOperation:
- admin.setInstanceOperation(clusterId, instanceName, state);
- break;
- case addInstanceTag:
- if (!validInstance(node, instanceName)) {
- return badRequest("Instance names are not match!");
- }
- for (String tag : (List<String>) OBJECT_MAPPER
-
.readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class))) {
- admin.addInstanceTag(clusterId, instanceName, tag);
- }
- break;
- case removeInstanceTag:
- if (!validInstance(node, instanceName)) {
- return badRequest("Instance names are not match!");
- }
- for (String tag : (List<String>) OBJECT_MAPPER
-
.readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class))) {
- admin.removeInstanceTag(clusterId, instanceName, tag);
- }
- break;
- case enablePartitions:
- admin.enablePartition(true, clusterId, instanceName,
- node.get(PerInstanceProperties.resource.name()).textValue(),
- (List<String>) OBJECT_MAPPER
-
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
- OBJECT_MAPPER.getTypeFactory()
- .constructCollectionType(List.class, String.class)));
- break;
- case disablePartitions:
- admin.enablePartition(false, clusterId, instanceName,
- node.get(PerInstanceProperties.resource.name()).textValue(),
- (List<String>) OBJECT_MAPPER
-
.readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class)));
- break;
- default:
- LOG.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
+ (List<String>) OBJECT_MAPPER.readValue(
+ node.get(PerInstanceProperties.partitions.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory()
+ .constructCollectionType(List.class, String.class)));
+ break;
+ case setInstanceOperation:
+ admin.setInstanceOperation(clusterId, instanceName, state);
+ break;
+ case canCompleteSwap:
+ if (!admin.canCompleteSwap(clusterId, instanceName)) {
+ return badRequest("Swap is not ready to be completed!");
+ }
+ break;
+ case completeSwapIfPossible:
+ if (!admin.completeSwapIfPossible(clusterId, instanceName)) {
+ return badRequest("Swap is not ready to be completed!");
+ }
+ break;
+ case addInstanceTag:
+ if (!validInstance(node, instanceName)) {
+ return badRequest("Instance names are not match!");
+ }
+ for (String tag : (List<String>) OBJECT_MAPPER.readValue(
+ node.get(PerInstanceProperties.instanceTags.name()).toString(),
+
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class))) {
+ admin.addInstanceTag(clusterId, instanceName, tag);
+ }
+ break;
+ case removeInstanceTag:
+ if (!validInstance(node, instanceName)) {
+ return badRequest("Instance names are not match!");
+ }
+ for (String tag : (List<String>) OBJECT_MAPPER.readValue(
+ node.get(PerInstanceProperties.instanceTags.name()).toString(),
+
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class))) {
+ admin.removeInstanceTag(clusterId, instanceName, tag);
+ }
+ break;
+ case enablePartitions:
+ admin.enablePartition(true, clusterId, instanceName,
+ node.get(PerInstanceProperties.resource.name()).textValue(),
+ (List<String>) OBJECT_MAPPER.readValue(
+ node.get(PerInstanceProperties.partitions.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory()
+ .constructCollectionType(List.class, String.class)));
+ break;
+ case disablePartitions:
+ admin.enablePartition(false, clusterId, instanceName,
+ node.get(PerInstanceProperties.resource.name()).textValue(),
+ (List<String>) OBJECT_MAPPER.readValue(
+ node.get(PerInstanceProperties.partitions.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory()
+ .constructCollectionType(List.class, String.class)));
+ break;
+ default:
+ LOG.error("Unsupported command :" + command);
+ return badRequest("Unsupported command :" + command);
}
} catch (Exception e) {
LOG.error("Failed in updating instance : " + instanceName, e);