This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new 4552f87e5 Implement the cross-zone-based stoppable check (#2680)
4552f87e5 is described below
commit 4552f87e57dc22de34807887eeefa4e7fde00f5d
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Tue Oct 31 15:41:05 2023 -0700
Implement the cross-zone-based stoppable check (#2680)
Implement the cross-zone-based stoppable check and add
to_be_stopped_instances query parameter to the stoppable check API
---
.../apache/helix/util/InstanceValidationUtil.java | 58 +++++++-
.../helix/util/TestInstanceValidationUtil.java | 77 ++++++++++
.../MaintenanceManagementService.java | 40 ++++--
.../StoppableInstancesSelector.java | 138 ++++++++++++------
.../server/resources/helix/InstancesAccessor.java | 35 +++--
.../TestMaintenanceManagementService.java | 14 +-
.../helix/rest/server/AbstractTestClass.java | 77 +++++++++-
.../helix/rest/server/TestInstancesAccessor.java | 155 +++++++++++++++++++++
.../util/TestInstanceValidationUtilInRest.java | 64 +++++++++
9 files changed, 576 insertions(+), 82 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index fdbf7dd1a..5f179e784 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.util;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -254,6 +255,28 @@ public class InstanceValidationUtil {
public static Map<String, List<String>>
perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String
instanceToBeStop,
HelixDataAccessor dataAccessor) {
+ return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus,
instanceToBeStop,
+ dataAccessor, Collections.emptySet());
+ }
+
+ /**
+ * Get the problematic partitions on the to-be-stop instance
+ * Requirement:
+ * If the instance and the toBeStoppedInstances are stopped and the
partitions on them are OFFLINE,
+ * the cluster still have enough "healthy" replicas on other sibling
instances
+ *
+ * - sibling instances mean those who share the same partition (replicas)
of the to-be-stop instance
+ *
+ * @param globalPartitionHealthStatus (instance => (partition name, health
status))
+ * @param instanceToBeStop The instance to be stopped
+ * @param dataAccessor The data accessor
+ * @param toBeStoppedInstances A set of instances presumed to be are already
stopped. And it
+ * shouldn't contain the `instanceToBeStop`
+ * @return A list of problematic partitions if the instance is stopped
+ */
+ public static Map<String, List<String>>
perPartitionHealthCheck(List<ExternalView> externalViews,
+ Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String
instanceToBeStop,
+ HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
Map<String, List<String>> unhealthyPartitions = new HashMap<>();
for (ExternalView externalView : externalViews) {
@@ -273,7 +296,8 @@ public class InstanceValidationUtil {
&&
stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
- if (siblingInstance.equals(instanceToBeStop)) {
+ if (siblingInstance.equals(instanceToBeStop) ||
(toBeStoppedInstances != null
+ && toBeStoppedInstances.contains(siblingInstance))) {
continue;
}
@@ -366,11 +390,32 @@ public class InstanceValidationUtil {
*
* TODO: Use in memory cache and query instance's currentStates
*
- * @param dataAccessor
- * @param instanceName
+ * @param dataAccessor A helper class to access the Helix data.
+ * @param instanceName An instance to be evaluated against this check.
+ * @return
+ */
+ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor
dataAccessor,
+ String instanceName) {
+ return siblingNodesActiveReplicaCheck(dataAccessor, instanceName,
Collections.emptySet());
+ }
+
+ /**
+ * Check if sibling nodes of the instance meet min active replicas constraint
+ * Two instances are sibling of each other if they host the same partition.
And sibling nodes
+ * that are in toBeStoppableInstances will be presumed to be stopped.
+ * WARNING: The check uses ExternalView to reduce network traffic but suffer
from accuracy
+ * due to external view propagation latency
+ *
+ * TODO: Use in memory cache and query instance's currentStates
+ *
+ * @param dataAccessor A helper class to access the Helix data.
+ * @param instanceName An instance to be evaluated against this check.
+ * @param toBeStoppedInstances A set of instances presumed to be are already
stopped. And it
+ * shouldn't contain the `instanceName`
* @return
*/
- public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor
dataAccessor, String instanceName) {
+ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor
dataAccessor,
+ String instanceName, Set<String> toBeStoppedInstances) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources =
dataAccessor.getChildNames(propertyKeyBuilder.idealStates());
@@ -406,8 +451,9 @@ public class InstanceValidationUtil {
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry :
stateByInstanceMap.entrySet()) {
- if (!entry.getKey().equals(instanceName)
- && !unhealthyStates.contains(entry.getValue())) {
+ if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances
== null
+ || !toBeStoppedInstances.contains(entry.getKey())) &&
!unhealthyStates.contains(
+ entry.getValue())) {
numHealthySiblings++;
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 2c51fc92b..aa1ba3229 100644
---
a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++
b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -21,7 +21,9 @@ package org.apache.helix.util;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -401,6 +403,81 @@ public class TestInstanceValidationUtil {
Assert.assertTrue(result);
}
+ @Test
+ public void
TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() {
+ String resource = "resource";
+ Mock mock = new Mock();
+ doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
+ .getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+ // set ideal state
+ IdealState idealState = mock(IdealState.class);
+ when(idealState.isEnabled()).thenReturn(true);
+ when(idealState.isValid()).thenReturn(true);
+ when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
+ doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+
+ // set external view
+ ExternalView externalView = mock(ExternalView.class);
+ when(externalView.getMinActiveReplicas()).thenReturn(2);
+ when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE,
"Master",
+ "instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
+ doReturn(externalView).when(mock.dataAccessor)
+ .getProperty(argThat(new
PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+ StateModelDefinition stateModelDefinition =
mock(StateModelDefinition.class);
+ when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+ doReturn(stateModelDefinition).when(mock.dataAccessor)
+ .getProperty(argThat(new
PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+ Set<String> toBeStoppedInstances = new HashSet<>();
+ toBeStoppedInstances.add("instance3");
+ toBeStoppedInstances.add("invalidInstances"); // include an invalid
instance.
+ boolean result =
+
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor,
TEST_INSTANCE, toBeStoppedInstances);
+ Assert.assertTrue(result);
+
+ result =
+
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor,
TEST_INSTANCE, null);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void
TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
+ String resource = "resource";
+ Mock mock = new Mock();
+ doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
+ .getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+ // set ideal state
+ IdealState idealState = mock(IdealState.class);
+ when(idealState.isEnabled()).thenReturn(true);
+ when(idealState.isValid()).thenReturn(true);
+ when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
+ doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+
+ // set external view
+ ExternalView externalView = mock(ExternalView.class);
+ when(externalView.getMinActiveReplicas()).thenReturn(2);
+ when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE,
"Master",
+ "instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
+ doReturn(externalView).when(mock.dataAccessor)
+ .getProperty(argThat(new
PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+ StateModelDefinition stateModelDefinition =
mock(StateModelDefinition.class);
+ when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+ doReturn(stateModelDefinition).when(mock.dataAccessor)
+ .getProperty(argThat(new
PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+ Set<String> toBeStoppedInstances = new HashSet<>();
+ toBeStoppedInstances.add("instance1");
+ toBeStoppedInstances.add("instance2");
+ boolean result =
+
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor,
TEST_INSTANCE, toBeStoppedInstances);
+
+ Assert.assertFalse(result);
+ }
+
@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 529fc469d..c3fa04966 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -339,17 +339,23 @@ public class MaintenanceManagementService {
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String
instanceName,
String jsonContent) throws IOException {
- return batchGetInstancesStoppableChecks(clusterId,
ImmutableList.of(instanceName), jsonContent)
- .get(instanceName);
+ return batchGetInstancesStoppableChecks(clusterId,
ImmutableList.of(instanceName),
+ jsonContent).get(instanceName);
}
-
public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String
clusterId,
List<String> instances, String jsonContent) throws IOException {
+ return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
+ Collections.emptySet());
+ }
+
+ public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String
clusterId,
+ List<String> instances, String jsonContent, Set<String>
toBeStoppedInstances) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
- batchHelixInstanceStoppableCheck(clusterId, instances,
finalStoppableChecks);
+ batchHelixInstanceStoppableCheck(clusterId, instances,
finalStoppableChecks,
+ toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId,
instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
@@ -441,10 +447,11 @@ public class MaintenanceManagementService {
}
private List<String> batchHelixInstanceStoppableCheck(String clusterId,
- Collection<String> instances, Map<String, StoppableCheck>
finalStoppableChecks) {
- Map<String, Future<StoppableCheck>> helixInstanceChecks =
instances.stream().collect(Collectors
- .toMap(Function.identity(),
- instance -> POOL.submit(() ->
performHelixOwnInstanceCheck(clusterId, instance))));
+ Collection<String> instances, Map<String, StoppableCheck>
finalStoppableChecks,
+ Set<String> toBeStoppedInstances) {
+ Map<String, Future<StoppableCheck>> helixInstanceChecks =
instances.stream().collect(
+ Collectors.toMap(Function.identity(), instance -> POOL.submit(
+ () -> performHelixOwnInstanceCheck(clusterId, instance,
toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health
check
return filterInstancesForNextCheck(helixInstanceChecks,
finalStoppableChecks);
}
@@ -512,7 +519,8 @@ public class MaintenanceManagementService {
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
- batchHelixInstanceStoppableCheck(clusterId, instancesForNext,
finalStoppableChecks);
+ batchHelixInstanceStoppableCheck(clusterId, instancesForNext,
finalStoppableChecks,
+ Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
@@ -601,10 +609,12 @@ public class MaintenanceManagementService {
return true;
}
- private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String
instanceName) {
+ private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String
instanceName,
+ Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId,
instanceName);
Map<String, Boolean> helixStoppableCheck =
- getInstanceHealthStatus(clusterId, instanceName,
HealthCheck.STOPPABLE_CHECK_LIST);
+ getInstanceHealthStatus(clusterId, instanceName,
HealthCheck.STOPPABLE_CHECK_LIST,
+ toBeStoppedInstances);
return new StoppableCheck(helixStoppableCheck,
StoppableCheck.Category.HELIX_OWN_CHECK);
}
@@ -698,6 +708,12 @@ public class MaintenanceManagementService {
@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName,
List<HealthCheck> healthChecks) {
+ return getInstanceHealthStatus(clusterId, instanceName, healthChecks,
Collections.emptySet());
+ }
+
+ @VisibleForTesting
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName,
+ List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Map<String, Boolean> healthStatus = new HashMap<>();
for (HealthCheck healthCheck : healthChecks) {
switch (healthCheck) {
@@ -745,7 +761,7 @@ public class MaintenanceManagementService {
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
-
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor,
instanceName));
+
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor,
instanceName, toBeStoppedInstances));
break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index dafe1ab2d..8cf8bc83c 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -22,6 +22,7 @@ package org.apache.helix.rest.clusterMaintenanceService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -33,31 +34,27 @@ import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
public class StoppableInstancesSelector {
// This type does not belong to real HealthCheck failed reason. Also, if we
add this type
// to HealthCheck enum, it could introduce more unnecessary check step since
the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
- private String _clusterId;
+ private final String _clusterId;
private List<String> _orderOfZone;
- private String _customizedInput;
- private ArrayNode _stoppableInstances;
- private ObjectNode _failedStoppableInstances;
- private MaintenanceManagementService _maintenanceService;
- private ClusterTopology _clusterTopology;
+ private final String _customizedInput;
+ private final MaintenanceManagementService _maintenanceService;
+ private final ClusterTopology _clusterTopology;
public StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
- String customizedInput, ArrayNode stoppableInstances, ObjectNode
failedStoppableInstances,
- MaintenanceManagementService maintenanceService, ClusterTopology
clusterTopology) {
+ String customizedInput, MaintenanceManagementService maintenanceService,
+ ClusterTopology clusterTopology) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
- _stoppableInstances = stoppableInstances;
- _failedStoppableInstances = failedStoppableInstances;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
}
@@ -69,26 +66,92 @@ public class StoppableInstancesSelector {
* reasons for non-stoppability.
*
* @param instances A list of instance to be evaluated.
+ * @param toBeStoppedInstances A list of instances presumed to be are
already stopped
+ * @return An ObjectNode containing:
+ * - 'stoppableNode': List of instances that can be stopped.
+ * - 'instance_not_stoppable_with_reasons': A map with the instance
name as the key and
+ * a list of reasons for non-stoppability as the value.
* @throws IOException
*/
- public void getStoppableInstancesInSingleZone(List<String> instances) throws
IOException {
+ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
+ List<String> toBeStoppedInstances) throws IOException {
+ ObjectNode result = JsonNodeFactory.instance.objectNode();
+ ArrayNode stoppableInstances =
+
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ ObjectNode failedStoppableInstances = result.putObject(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
+
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
+ populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet,
stoppableInstances,
+ failedStoppableInstances);
+ processNonexistentInstances(instances, failedStoppableInstances);
+
+ return result;
+ }
+
+ /**
+ * Evaluates and collects stoppable instances cross a set of zones based on
the order of zones.
+ * The method iterates through instances, performing stoppable checks, and
records reasons for
+ * non-stoppability.
+ *
+ * @param instances A list of instance to be evaluated.
+ * @param toBeStoppedInstances A list of instances presumed to be are
already stopped
+ * @return An ObjectNode containing:
+ * - 'stoppableNode': List of instances that can be stopped.
+ * - 'instance_not_stoppable_with_reasons': A map with the instance
name as the key and
+ * a list of reasons for non-stoppability as the value.
+ * @throws IOException
+ */
+ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
+ List<String> toBeStoppedInstances) throws IOException {
+ ObjectNode result = JsonNodeFactory.instance.objectNode();
+ ArrayNode stoppableInstances =
+
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ ObjectNode failedStoppableInstances = result.putObject(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
+
+ Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
+ for (String zone : _orderOfZone) {
+ Set<String> instanceSet = new HashSet<>(instances);
+ Set<String> currentZoneInstanceSet = new
HashSet<>(zoneMapping.get(zone));
+ instanceSet.retainAll(currentZoneInstanceSet);
+ if (instanceSet.isEmpty()) {
+ continue;
+ }
+ populateStoppableInstances(new ArrayList<>(instanceSet),
toBeStoppedInstancesSet, stoppableInstances,
+ failedStoppableInstances);
+ }
+ processNonexistentInstances(instances, failedStoppableInstances);
+ return result;
+ }
+
+ private void populateStoppableInstances(List<String> instances, Set<String>
toBeStoppedInstances,
+ ArrayNode stoppableInstances, ObjectNode failedStoppableInstances)
throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
- _maintenanceService.batchGetInstancesStoppableChecks(_clusterId,
zoneBasedInstance,
- _customizedInput);
+ _maintenanceService.batchGetInstancesStoppableChecks(_clusterId,
instances,
+ _customizedInput, toBeStoppedInstances);
+
for (Map.Entry<String, StoppableCheck> instanceStoppableCheck :
instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
if (!stoppableCheck.isStoppable()) {
- ArrayNode failedReasonsNode =
_failedStoppableInstances.putArray(instance);
+ ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(instance);
for (String failedReason : stoppableCheck.getFailedChecks()) {
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
}
} else {
- _stoppableInstances.add(instance);
+ stoppableInstances.add(instance);
+ // Update the toBeStoppedInstances set with the currently identified
stoppable instance.
+ // This ensures that subsequent checks in other zones are aware of
this instance's stoppable status.
+ toBeStoppedInstances.add(instance);
}
}
+ }
+
+ private void processNonexistentInstances(List<String> instances, ObjectNode
failedStoppableInstances) {
// Adding following logic to check whether instances exist or not. An
instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
@@ -100,28 +163,36 @@ public class StoppableInstancesSelector {
Set<String> nonSelectedInstances = new HashSet<>(instances);
nonSelectedInstances.removeAll(_clusterTopology.getAllInstances());
for (String nonSelectedInstance : nonSelectedInstances) {
- ArrayNode failedReasonsNode =
_failedStoppableInstances.putArray(nonSelectedInstance);
+ ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
}
- public void getStoppableInstancesCrossZones() {
- // TODO: Add implementation to enable cross zone stoppable check.
- throw new NotImplementedException("Not Implemented");
- }
-
/**
* Determines the order of zones. If an order is provided by the user, it
will be used directly.
* Otherwise, zones will be ordered by their associated instance count in
descending order.
*
* If `random` is true, the order of zones will be randomized regardless of
any previous order.
*
+ * @param instances A list of instance to be used to calculate the order of
zones.
* @param random Indicates whether to randomize the order of zones.
*/
- public void calculateOrderOfZone(boolean random) {
+ public void calculateOrderOfZone(List<String> instances, boolean random) {
if (_orderOfZone == null) {
- _orderOfZone =
- new
ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet());
+ Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
+ Map<String, Set<String>> zoneToInstancesMap = new HashMap<>();
+ for (ClusterTopology.Zone zone : _clusterTopology.getZones()) {
+ Set<String> instanceSet = new HashSet<>(instances);
+ // TODO: Use instance config from Helix-rest Cache to get the zone
instead of reading the topology info
+ Set<String> currentZoneInstanceSet = new
HashSet<>(zoneMapping.get(zone.getId()));
+ instanceSet.retainAll(currentZoneInstanceSet);
+ if (instanceSet.isEmpty()) {
+ continue;
+ }
+ zoneToInstancesMap.put(zone.getId(), instanceSet);
+ }
+
+ _orderOfZone = new
ArrayList<>(getOrderedZoneToInstancesMap(zoneToInstancesMap).keySet());
}
if (_orderOfZone.isEmpty()) {
@@ -182,8 +253,6 @@ public class StoppableInstancesSelector {
private String _clusterId;
private List<String> _orderOfZone;
private String _customizedInput;
- private ArrayNode _stoppableInstances;
- private ObjectNode _failedStoppableInstances;
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
@@ -202,16 +271,6 @@ public class StoppableInstancesSelector {
return this;
}
- public StoppableInstancesSelectorBuilder setStoppableInstances(ArrayNode
stoppableInstances) {
- _stoppableInstances = stoppableInstances;
- return this;
- }
-
- public StoppableInstancesSelectorBuilder
setFailedStoppableInstances(ObjectNode failedStoppableInstances) {
- _failedStoppableInstances = failedStoppableInstances;
- return this;
- }
-
public StoppableInstancesSelectorBuilder setMaintenanceService(
MaintenanceManagementService maintenanceService) {
_maintenanceService = maintenanceService;
@@ -224,9 +283,8 @@ public class StoppableInstancesSelector {
}
public StoppableInstancesSelector build() {
- return new StoppableInstancesSelector(_clusterId, _orderOfZone,
- _customizedInput, _stoppableInstances, _failedStoppableInstances,
_maintenanceService,
- _clusterTopology);
+ return new StoppableInstancesSelector(_clusterId, _orderOfZone,
_customizedInput,
+ _maintenanceService, _clusterTopology);
}
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 8a2120270..785195ebe 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -20,7 +20,9 @@ package org.apache.helix.rest.server.resources.helix;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,6 +69,7 @@ public class InstancesAccessor extends AbstractHelixResource {
disabled,
selection_base,
zone_order,
+ to_be_stopped_instances,
customized_values,
instance_stoppable_parallel,
instance_not_stoppable_with_reasons
@@ -224,6 +227,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
List<String> orderOfZone = null;
String customizedInput = null;
+ List<String> toBeStoppedInstances = Collections.emptyList();
if
(node.get(InstancesAccessor.InstancesProperties.customized_values.name()) !=
null) {
customizedInput =
node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
@@ -235,18 +239,26 @@ public class InstancesAccessor extends
AbstractHelixResource {
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
if (!orderOfZone.isEmpty() && random) {
String message =
- "Both 'orderOfZone' and 'random' parameters are set. Please
specify only one option.";
+ "Both 'zone_order' and 'random' parameters are set. Please
specify only one option.";
_logger.error(message);
return badRequest(message);
}
}
- // Prepare output result
- ObjectNode result = JsonNodeFactory.instance.objectNode();
- ArrayNode stoppableInstances =
-
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
- ObjectNode failedStoppableInstances = result.putObject(
-
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ if
(node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name())
!= null) {
+ toBeStoppedInstances = OBJECT_MAPPER.readValue(
+
node.get(InstancesProperties.to_be_stopped_instances.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
+ Set<String> instanceSet = new HashSet<>(instances);
+ instanceSet.retainAll(toBeStoppedInstances);
+ if (!instanceSet.isEmpty()) {
+ String message =
+ "'to_be_stopped_instances' and 'instances' have intersection: "
+ instanceSet
+ + ". Please make them mutually exclusive.";
+ _logger.error(message);
+ return badRequest(message);
+ }
+ }
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService((ZKHelixDataAccessor)
getDataAccssor(clusterId),
@@ -260,18 +272,17 @@ public class InstancesAccessor extends
AbstractHelixResource {
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
.setCustomizedInput(customizedInput)
- .setStoppableInstances(stoppableInstances)
- .setFailedStoppableInstances(failedStoppableInstances)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.build();
- stoppableInstancesSelector.calculateOrderOfZone(random);
+ stoppableInstancesSelector.calculateOrderOfZone(instances, random);
+ ObjectNode result;
switch (selectionBase) {
case zone_based:
-
stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances);
+ result =
stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances,
toBeStoppedInstances);
break;
case cross_zone_based:
- stoppableInstancesSelector.getStoppableInstancesCrossZones();
+ result =
stoppableInstancesSelector.getStoppableInstancesCrossZones(instances,
toBeStoppedInstances);
break;
case instance_based:
default:
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index f8408b070..a49a95066 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -114,7 +114,7 @@ public class TestMaintenanceManagementService {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName,
- List<HealthCheck> healthChecks) {
+ List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
return Collections.emptyMap();
}
}
@@ -127,7 +127,7 @@ public class TestMaintenanceManagementService {
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
- String instanceName, List<HealthCheck> healthChecks) {
+ String instanceName, List<HealthCheck> healthChecks, Set<String>
toBeStoppedInstances) {
return failedCheck;
}
};
@@ -147,7 +147,7 @@ public class TestMaintenanceManagementService {
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
- String instanceName, List<HealthCheck> healthChecks) {
+ String instanceName, List<HealthCheck> healthChecks, Set<String>
toBeStoppedInstances) {
return Collections.emptyMap();
}
};
@@ -227,7 +227,7 @@ public class TestMaintenanceManagementService {
_customRestClient, false, false,
new
HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)),
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
-
+
StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, "");
List<String> expectedFailedChecks = Arrays.asList(
StoppableCheck.Category.CUSTOM_PARTITION_CHECK.getPrefix()
@@ -246,7 +246,7 @@ public class TestMaintenanceManagementService {
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
- String instanceName, List<HealthCheck> healthChecks) {
+ String instanceName, List<HealthCheck> healthChecks, Set<String>
toBeStoppedInstances) {
return Collections.emptyMap();
}
};
@@ -365,7 +365,7 @@ public class TestMaintenanceManagementService {
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
- String instanceName, List<HealthCheck> healthChecks) {
+ String instanceName, List<HealthCheck> healthChecks, Set<String>
toBeStoppedInstances) {
return instanceHealthFailedCheck;
}
};
@@ -393,7 +393,7 @@ public class TestMaintenanceManagementService {
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
- String instanceName, List<HealthCheck> healthChecks) {
+ String instanceName, List<HealthCheck> healthChecks, Set<String>
toBeStoppedInstances) {
return Collections.emptyMap();
}
};
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index a7cf91c7b..68561ce83 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -54,6 +54,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.RESTConfig;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.common.HelixRestNamespace;
@@ -129,9 +130,14 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
protected static HelixZkClient _gZkClientTestNS;
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
+ protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3",
"instance4", "instance5");
+ protected static final List<String> STOPPABLE_INSTANCES2 =
+ Arrays.asList("instance0", "instance1", "instance2", "instance3",
"instance4", "instance5",
+ "instance6", "instance7", "instance8", "instance9", "instance10",
"instance11",
+ "instance12", "instance13", "instance14");
protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
@@ -329,13 +335,14 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_configAccessor.setClusterConfig(cluster, clusterConfig);
createResourceConfigs(cluster, 8);
_workflowMap.put(cluster, createWorkflows(cluster, 3));
- Set<String> resources = createResources(cluster, 8);
+ Set<String> resources = createResources(cluster, 8, MIN_ACTIVE_REPLICA,
NUM_REPLICA);
_instancesMap.put(cluster, instances);
_liveInstancesMap.put(cluster, liveInstances);
_resourcesMap.put(cluster, resources);
_clusterControllerManagers.add(startController(cluster));
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER,
STOPPABLE_INSTANCES);
+ preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2,
STOPPABLE_INSTANCES2);
}
protected Set<String> createInstances(String cluster, int numInstances) {
@@ -348,16 +355,17 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
return instances;
}
- protected Set<String> createResources(String cluster, int numResources) {
+ protected Set<String> createResources(String cluster, int numResources, int
minActiveReplica,
+ int replicationFactor) {
Set<String> resources = new HashSet<>();
for (int i = 0; i < numResources; i++) {
String resource = cluster + "_db_" + i;
_gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS,
"MasterSlave");
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
- idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
+ idealState.setMinActiveReplicas(minActiveReplica);
_gSetupTool.getClusterManagementTool().setResourceIdealState(cluster,
resource, idealState);
- _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
+ _gSetupTool.rebalanceStorageCluster(cluster, resource,
replicationFactor);
resources.add(resource);
}
return resources;
@@ -575,7 +583,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
// Start participant
startInstances(clusterName, new TreeSet<>(instances), 3);
- createResources(clusterName, 1);
+ createResources(clusterName, 1, MIN_ACTIVE_REPLICA, NUM_REPLICA);
_clusterControllerManagers.add(startController(clusterName));
// Make sure that cluster config exists
@@ -606,6 +614,65 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3));
}
+ private void preSetupForCrosszoneParallelInstancesStoppableTest(String
clusterName,
+ List<String> instances) throws Exception {
+ _gSetupTool.addCluster(clusterName, true);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setPersistIntermediateAssignment(true);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ RESTConfig emptyRestConfig = new RESTConfig(clusterName);
+ _configAccessor.setRESTConfig(clusterName, emptyRestConfig);
+ // Create instance configs
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ int perZoneInstancesCount = 3;
+ int curZoneCount = 0, zoneId = 1;
+ for (int i = 0; i < instances.size(); i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+ instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance"
+ i);
+ if (++curZoneCount >= perZoneInstancesCount) {
+ curZoneCount = 0;
+ zoneId++;
+ }
+ instanceConfigs.add(instanceConfig);
+ }
+
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ _gSetupTool.getClusterManagementTool().addInstance(clusterName,
instanceConfig);
+ }
+
+ // Start participant
+ startInstances(clusterName, new TreeSet<>(instances), instances.size());
+ createResources(clusterName, 1, 2, 3);
+ _clusterControllerManagers.add(startController(clusterName));
+
+ // Make sure that cluster config exists
+ boolean isClusterConfigExist = TestHelper.verify(() -> {
+ ClusterConfig stoppableClusterConfig;
+ try {
+ stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
+ } catch (Exception e) {
+ return false;
+ }
+ return (stoppableClusterConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isClusterConfigExist);
+ // Make sure that instance config exists for the instance0 to instance5
+ for (String instance: instances) {
+ boolean isinstanceConfigExist = TestHelper.verify(() -> {
+ InstanceConfig instanceConfig;
+ try {
+ instanceConfig = _configAccessor.getInstanceConfig(clusterName,
instance);
+ } catch (Exception e) {
+ return false;
+ }
+ return (instanceConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isinstanceConfigExist);
+ }
+ _clusters.add(clusterName);
+ _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
+ }
/**
* Starts a HelixRestServer for the test suite.
* @return
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 01701a486..2bc539a4d 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -40,12 +40,167 @@ import
org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestInstancesAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_0";
+ @DataProvider
+ public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() {
+ return new Object[][]{
+ {String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\",
\"%s\", \"%s\","
+ + " \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\",
\"%s\"],"
+ + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"],
\"%s\":[\"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+
InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(),
"instance1", "instance2",
+ "instance3", "instance4", "instance5", "instance6", "instance7",
"instance8",
+ "instance9", "instance10", "instance11", "instance12",
"instance13", "instance14",
+ "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(),"zone5",
"zone4", "zone3", "zone2",
+ "zone1",
+
InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+ "instance0"),
+ },
+ {String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\",
\"%s\", \"%s\",\"%s\", \"%s\", \"%s\"],"
+ + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"],
\"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+
InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(),
"instance1", "instance3",
+ "instance6", "instance9", "instance10", "instance11",
"instance12", "instance13",
+ "instance14", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone5",
"zone4", "zone1",
+ "zone3", "zone2",
InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+ "instance0", "invalidInstance1", "invalidInstance1"),
+ }
+ };
+ }
+
+ @Test
+ public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws
IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance0", "instance6", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance4") &&
stoppableSet.contains("instance3"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test
+ public void testInstanceStoppableZoneBasedWithoutZoneOrder() throws
IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"],
\"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance0",
"instance1",
+ "instance2", "instance3", "instance4", "invalidInstance",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+ "instance7", "instance9", "instance10");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ // Without zone order, helix should pick the zone1 because it has higher
instance count than zone2.
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance0") &&
stoppableSet.contains("instance1"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dataProvider = "generatePayloadCrossZoneStoppableCheckWithZoneOrder")
+ public void testCrossZoneStoppableWithZoneOrder(String content) throws
IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance14") &&
stoppableSet.contains("instance12")
+ && stoppableSet.contains("instance11") &&
stoppableSet.contains("instance10"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
@Test
+ public void testCrossZoneStoppableWithoutZoneOrder() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\",
\"%s\",\"%s\", \"%s\", \"%s\"],"
+ + "\"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
"instance3",
+ "instance6", "instance9", "instance10", "instance11", "instance12",
"instance13",
+ "instance14", "invalidInstance",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance0",
+ "invalidInstance1", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance14") &&
stoppableSet.contains("instance12")
+ && stoppableSet.contains("instance11") &&
stoppableSet.contains("instance10"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+
+ @Test(dependsOnMethods =
"testInstanceStoppableZoneBasedWithToBeStoppedInstances")
public void testInstanceStoppable_zoneBased_zoneOrder() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Select instances with zone based
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
index 35e6399e0..e37da34ff 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
@@ -22,8 +22,10 @@ package org.apache.helix.rest.server.util;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
@@ -103,6 +105,52 @@ public class TestInstanceValidationUtilInRest{
Assert.assertEquals(failedPartitions.keySet().size(), 2);
}
+ @Test
+ public void testPartitionLevelCheckWithToBeStoppedNode() {
+ List<ExternalView> externalViews = new
ArrayList<>(Arrays.asList(prepareExternalViewOnline()));
+ Mock mock = new Mock();
+ HelixDataAccessor accessor = mock.dataAccessor;
+
+ when(mock.dataAccessor.keyBuilder())
+ .thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+ when(mock.dataAccessor
+ .getProperty(new
PropertyKey.Builder(TEST_CLUSTER).stateModelDef(MasterSlaveSMD.name)))
+ .thenReturn(mock.stateModel);
+ when(mock.stateModel.getTopState()).thenReturn("MASTER");
+ when(mock.stateModel.getInitialState()).thenReturn("OFFLINE");
+
+ Map<String, Map<String, Boolean>> partitionStateMap = new HashMap<>();
+ partitionStateMap.put("h1", new HashMap<>());
+ partitionStateMap.put("h2", new HashMap<>());
+ partitionStateMap.put("h3", new HashMap<>());
+ partitionStateMap.put("h4", new HashMap<>());
+
+ partitionStateMap.get("h1").put("p1", true);
+ partitionStateMap.get("h2").put("p1", true);
+ partitionStateMap.get("h3").put("p1", true);
+ partitionStateMap.get("h4").put("p1", true);
+
+ partitionStateMap.get("h1").put("p2", true);
+ partitionStateMap.get("h2").put("p2", false);
+ partitionStateMap.get("h3").put("p2", true);
+
+ Set<String> toBeStoppedInstances = new HashSet<>();
+ toBeStoppedInstances.add("h3");
+ Map<String, List<String>> failedPartitions =
InstanceValidationUtil.perPartitionHealthCheck(
+ externalViews, partitionStateMap, "h1", accessor,
toBeStoppedInstances);
+ Assert.assertEquals(failedPartitions.keySet().size(), 1);
+ Assert.assertEquals(failedPartitions.get("p2").size(), 1);
+
Assert.assertTrue(failedPartitions.get("p2").contains("UNHEALTHY_PARTITION"));
+
+ toBeStoppedInstances.remove("h3");
+ toBeStoppedInstances.add("h2");
+ failedPartitions =
+ InstanceValidationUtil.perPartitionHealthCheck(externalViews,
partitionStateMap, "h1",
+ accessor, toBeStoppedInstances);
+ // Since we presume h2 as being already stopped, the health status of p2
on h2 will be skipped.
+ Assert.assertEquals(failedPartitions.keySet().size(), 0);
+ }
+
private ExternalView prepareExternalView() {
ExternalView externalView = new ExternalView(RESOURCE_NAME);
externalView.getRecord()
@@ -163,6 +211,22 @@ public class TestInstanceValidationUtilInRest{
return externalView;
}
+ private ExternalView prepareExternalViewOnline() {
+ ExternalView externalView = new ExternalView(RESOURCE_NAME);
+ externalView.getRecord()
+
.setSimpleField(ExternalView.ExternalViewProperty.STATE_MODEL_DEF_REF.toString(),
+ MasterSlaveSMD.name);
+ externalView.setState("p1", "h1", "MASTER");
+ externalView.setState("p1", "h2", "SLAVE");
+ externalView.setState("p1", "h3", "SLAVE");
+
+ externalView.setState("p2", "h1", "MASTER");
+ externalView.setState("p2", "h2", "SLAVE");
+ externalView.setState("p2", "h3", "SLAVE");
+
+ return externalView;
+ }
+
private final class Mock {
private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
private StateModelDefinition stateModel = mock(StateModelDefinition.class);