This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new 56dd5d25a Implement 'StoppableCheck' Flag to Ensure Maintenance Mode
is Avoided After Stoppable Instances are Shutdown (#2736)
56dd5d25a is described below
commit 56dd5d25a4fc49f30531521910415982eb403060
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Wed Jan 31 10:34:08 2024 -0800
Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After
Stoppable Instances are Shutdown (#2736)
Implement 'StoppableCheck' Flag to Ensure Maintenance Mode is Avoided After
Stoppable Instances are offline
---
.../StoppableInstancesSelector.java | 72 ++++-
.../server/resources/helix/InstancesAccessor.java | 57 +++-
.../helix/rest/server/AbstractTestClass.java | 65 +++++
.../helix/rest/server/TestInstancesAccessor.java | 325 +++++++++++++++++++++
4 files changed, 501 insertions(+), 18 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index 877aaa9c8..e366fa12f 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -47,22 +47,29 @@ public class StoppableInstancesSelector {
// to HealthCheck enum, it could introduce more unnecessary check step since
the InstanceServiceImpl
// loops all the types to do corresponding checks.
private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
+ private final static String EXCEED_MAX_OFFLINE_INSTANCES =
+ "HELIX:EXCEED_MAX_OFFLINE_INSTANCES";
private final String _clusterId;
private List<String> _orderOfZone;
private final String _customizedInput;
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;
+ private final int _maxAdditionalOfflineInstances;
+ private final boolean _continueOnFailure;
private StoppableInstancesSelector(String clusterId, List<String>
orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
- ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
+ ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor,
+ int maxAdditionalOfflineInstances, boolean continueOnFailure) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
+ _maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
+ _continueOnFailure = continueOnFailure;
}
/**
@@ -92,7 +99,7 @@ public class StoppableInstancesSelector {
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet,
stoppableInstances,
- failedStoppableInstances);
+ failedStoppableInstances, _maxAdditionalOfflineInstances -
toBeStoppedInstancesSet.size());
processNonexistentInstances(instances, failedStoppableInstances);
return result;
@@ -129,15 +136,17 @@ public class StoppableInstancesSelector {
if (instanceSet.isEmpty()) {
continue;
}
- populateStoppableInstances(new ArrayList<>(instanceSet),
toBeStoppedInstancesSet, stoppableInstances,
- failedStoppableInstances);
+ populateStoppableInstances(new ArrayList<>(instanceSet),
toBeStoppedInstancesSet,
+ stoppableInstances, failedStoppableInstances,
+ _maxAdditionalOfflineInstances - toBeStoppedInstancesSet.size());
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
}
private void populateStoppableInstances(List<String> instances, Set<String>
toBeStoppedInstances,
- ArrayNode stoppableInstances, ObjectNode failedStoppableInstances)
throws IOException {
+ ArrayNode stoppableInstances, ObjectNode failedStoppableInstances,
+ int allowedOfflineCount) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId,
instances,
_customizedInput, toBeStoppedInstances);
@@ -145,16 +154,42 @@ public class StoppableInstancesSelector {
for (Map.Entry<String, StoppableCheck> instanceStoppableCheck :
instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
- if (!stoppableCheck.isStoppable()) {
- ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(instance);
- for (String failedReason : stoppableCheck.getFailedChecks()) {
-
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
- }
- } else {
+ if (stoppableCheck.isStoppable() && allowedOfflineCount > 0) {
stoppableInstances.add(instance);
// Update the toBeStoppedInstances set with the currently identified
stoppable instance.
// This ensures that subsequent checks in other zones are aware of
this instance's stoppable status.
toBeStoppedInstances.add(instance);
+ allowedOfflineCount--;
+ continue;
+ }
+ // TODO: If the maxOffline limit is reached, we should give previous
non-stoppable instances a failed reason
+ // of "EXCEED_MAX_OFFLINE_INSTANCES"
+ ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(instance);
+ consolidateResult(stoppableCheck, failedReasonsNode,
allowedOfflineCount);
+ }
+ }
+
+ private void consolidateResult(StoppableCheck stoppableCheck,
+ ArrayNode failedReasonsNode, int allowedOfflineCount) {
+ boolean failedHelixOwnChecks = false;
+ if (allowedOfflineCount <= 0) {
+
failedReasonsNode.add(JsonNodeFactory.instance.textNode(EXCEED_MAX_OFFLINE_INSTANCES));
+ failedHelixOwnChecks = true;
+ }
+
+ if (!stoppableCheck.isStoppable()) {
+ for (String failedReason : stoppableCheck.getFailedChecks()) {
+ // HELIX_OWN_CHECK can always be added to the failedReasonsNode.
+ if
(failedReason.startsWith(StoppableCheck.Category.HELIX_OWN_CHECK.getPrefix())) {
+
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
+ failedHelixOwnChecks = true;
+ continue;
+ }
+ // CUSTOM_INSTANCE_CHECK and CUSTOM_PARTITION_CHECK can only be added
to the failedReasonsNode
+ // if continueOnFailure is true and there is no failed
Helix_OWN_CHECKS.
+ if (_continueOnFailure && !failedHelixOwnChecks) {
+
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
+ }
}
}
}
@@ -282,6 +317,8 @@ public class StoppableInstancesSelector {
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;
+ private int _maxAdditionalOfflineInstances = Integer.MAX_VALUE;
+ private boolean _continueOnFailure;
public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
@@ -314,9 +351,20 @@ public class StoppableInstancesSelector {
return this;
}
+ public StoppableInstancesSelectorBuilder
setMaxAdditionalOfflineInstances(int maxAdditionalOfflineInstances) {
+ _maxAdditionalOfflineInstances = maxAdditionalOfflineInstances;
+ return this;
+ }
+
+ public StoppableInstancesSelectorBuilder setContinueOnFailure(boolean
continueOnFailure) {
+ _continueOnFailure = continueOnFailure;
+ return this;
+ }
+
public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone,
_customizedInput,
- _maintenanceService, _clusterTopology, _dataAccessor);
+ _maintenanceService, _clusterTopology, _dataAccessor,
_maxAdditionalOfflineInstances,
+ _continueOnFailure);
}
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index fcad387dc..a87784ce7 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -20,6 +20,7 @@ package org.apache.helix.rest.server.resources.helix;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -40,6 +41,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -158,7 +160,9 @@ public class InstancesAccessor extends
AbstractHelixResource {
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
@QueryParam("skipHealthCheckCategories") String
skipHealthCheckCategories,
- @DefaultValue("false") @QueryParam("random") boolean random, String
content) {
+ @DefaultValue("false") @QueryParam("random") boolean random,
+ @DefaultValue("false") @QueryParam("notExceedMaxOfflineInstances")
boolean notExceedMaxOfflineInstances,
+ String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
@@ -203,7 +207,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
break;
case stoppable:
return batchGetStoppableInstances(clusterId, node, skipZKRead,
continueOnFailures,
- skipHealthCheckCategorySet, random);
+ skipHealthCheckCategorySet, random,
notExceedMaxOfflineInstances);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
@@ -221,7 +225,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
private Response batchGetStoppableInstances(String clusterId, JsonNode node,
boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category>
skipHealthCheckCategories,
- boolean random) throws IOException {
+ boolean random, boolean notExceedingMaxOfflineInstances) throws
IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
@@ -233,7 +237,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
List<String> orderOfZone = null;
String customizedInput = null;
- List<String> toBeStoppedInstances = Collections.emptyList();
+ List<String> toBeStoppedInstances = new ArrayList<>();
// By default, if skip_stoppable_check_list is unset, all checks are
performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
@@ -302,7 +306,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId),
getConfigAccessor());
ClusterTopology clusterTopology =
clusterService.getClusterTopology(clusterId);
- StoppableInstancesSelector stoppableInstancesSelector =
+ StoppableInstancesSelector.StoppableInstancesSelectorBuilder builder =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
@@ -310,8 +314,49 @@ public class InstancesAccessor extends
AbstractHelixResource {
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
- .build();
+ .setContinueOnFailure(continueOnFailures);
+
+ Set<String> currentDisabledOrOfflineInstances = Collections.emptySet();
+ if (notExceedingMaxOfflineInstances) {
+ // If the clusterConfig or maxOfflineInstancesAllowed is not set, this
is an invalid request.
+ ClusterConfig clusterConfig =
getConfigAccessor().getClusterConfig(clusterId);
+ if (clusterConfig == null) {
+ String message =
+ "Invalid cluster name: " + clusterId + ". Cluster config does
not exist.";
+ _logger.error(message);
+ return badRequest(message);
+ }
+ int maxOfflineAllowed = clusterConfig.getMaxOfflineInstancesAllowed();
+ if (maxOfflineAllowed == -1) {
+ String message =
+ "Invalid cluster config: " + clusterId + ".
maxOfflineInstancesAllowed is not set.";
+ _logger.error(message);
+ return badRequest(message);
+ }
+
+ HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+ ConfigAccessor configAccessor = getConfigAccessor();
+ List<String> liveInstances =
+
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+ currentDisabledOrOfflineInstances =
+ clusterTopology.getAllInstances().stream().filter(instance -> {
+ // return instances that are disabled and not live.
+ return !configAccessor.getInstanceConfig(clusterId,
instance).getInstanceEnabled()
+ || !liveInstances.contains(instance);
+ }).collect(Collectors.toSet());
+ maxOfflineAllowed =
+ Math.max(0, maxOfflineAllowed -
currentDisabledOrOfflineInstances.size());
+ builder.setMaxAdditionalOfflineInstances(maxOfflineAllowed);
+ }
+
+ StoppableInstancesSelector stoppableInstancesSelector = builder.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
+ Set<String> finalCurrentDisabledOfflineInstances =
currentDisabledOrOfflineInstances;
+ // Since maxOfflineAllowed is set, we need to filter out the instances
that are already offline.
+ toBeStoppedInstances = toBeStoppedInstances.stream().filter(
+ instance -> clusterTopology.getAllInstances().contains(instance)
+ && !finalCurrentDisabledOfflineInstances.contains(instance))
+ .collect(Collectors.toList());
ObjectNode result;
switch (selectionBase) {
case zone_based:
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index d0f0c5715..404f23d26 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -131,6 +131,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
+ protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3",
"instance4", "instance5");
@@ -138,6 +139,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
Arrays.asList("instance0", "instance1", "instance2", "instance3",
"instance4", "instance5",
"instance6", "instance7", "instance8", "instance9", "instance10",
"instance11",
"instance12", "instance13", "instance14");
+ protected static final int STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES = 3;
protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
@@ -343,6 +345,8 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER,
STOPPABLE_INSTANCES);
preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2,
STOPPABLE_INSTANCES2);
+
preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(STOPPABLE_CLUSTER3,
+ STOPPABLE_INSTANCES2);
}
protected Set<String> createInstances(String cluster, int numInstances) {
@@ -620,6 +624,7 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
+
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
@@ -671,6 +676,66 @@ public class AbstractTestClass extends
JerseyTestNg.ContainerPerClassTest {
_clusters.add(clusterName);
_workflowMap.put(clusterName, createWorkflows(clusterName, 3));
}
+
+ private void
preSetupForCrosszoneParallelInstancesStoppableTestWithOfflineInstances(
+ String clusterName, List<String> instances) throws Exception {
+ _gSetupTool.addCluster(clusterName, true);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setPersistIntermediateAssignment(true);
+
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ // Create instance configs
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ int perZoneInstancesCount = 3;
+ int curZoneCount = 0, zoneId = 1;
+ for (int i = 0; i < instances.size(); i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+ instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance"
+ i);
+ if (++curZoneCount >= perZoneInstancesCount) {
+ curZoneCount = 0;
+ zoneId++;
+ }
+ instanceConfigs.add(instanceConfig);
+ }
+
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ _gSetupTool.getClusterManagementTool().addInstance(clusterName,
instanceConfig);
+ }
+
+ // Start participant and make two of them offline
+ startInstances(clusterName, new TreeSet<>(instances), instances.size() -
2);
+ createResources(clusterName, 1, 2, 3);
+ _clusterControllerManagers.add(startController(clusterName));
+
+ // Make sure that cluster config exists
+ boolean isClusterConfigExist = TestHelper.verify(() -> {
+ ClusterConfig stoppableClusterConfig;
+ try {
+ stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
+ } catch (Exception e) {
+ return false;
+ }
+ return (stoppableClusterConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isClusterConfigExist);
+ // Make sure that instance config exists for the instance0 to instance5
+ for (String instance: instances) {
+ boolean isinstanceConfigExist = TestHelper.verify(() -> {
+ InstanceConfig instanceConfig;
+ try {
+ instanceConfig = _configAccessor.getInstanceConfig(clusterName,
instance);
+ } catch (Exception e) {
+ return false;
+ }
+ return (instanceConfig != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isinstanceConfigExist);
+ }
+ _clusters.add(clusterName);
+ _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
+ }
+
/**
* Starts a HelixRestServer for the test suite.
* @return
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 5cfab76a0..3e91b699e 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
@@ -81,6 +82,330 @@ public class TestInstancesAccessor extends
AbstractTestClass {
}
@Test
+ public void testInstanceStoppableWithDisabledAndOfflineInstances() throws
Exception {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String instanceName = STOPPABLE_INSTANCES2.get(STOPPABLE_INSTANCES2.size()
- 1);
+ InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instanceName);
+ instanceConfig.setInstanceEnabled(false);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instanceName,
instanceConfig);
+ TestHelper.verify(
+ () -> !_configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3,
instanceName).getInstanceEnabled(),
+ 1000);
+
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance14", "instance9", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER3).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ // Since the maxOfflineAllowed is 0, no node is stoppable.
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(), 1);
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+ // restore the config
+ instanceConfig.setInstanceEnabled(true);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instanceName,
instanceConfig);
+ TestHelper.verify(
+ () -> _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3,
instanceName).getInstanceEnabled(),
+ 1000);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods =
"testInstanceStoppableWithDisabledAndOfflineInstances")
+ public void testInstancesStoppableWithOfflineInstancesInTopology() throws
IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance9", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER3).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(),
STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES - 1);
+ // If not setting notExceedMaxOfflineInstances=true
+ Assert.assertTrue(stoppableSet.contains("instance3") &&
stoppableSet.contains("instance5"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // Before: "StoppableTestCluster3_db_0_8" : {"instance12" :
"MASTER","instance4" : "SLAVE", "instance9" : "SLAVE"},
+ // After: "StoppableTestCluster3_db_0_8" : {"instance12" :
"MASTER","instance4" : "SLAVE"},
+ // Since instance9 is not live, instance4 is no longer stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+
+ content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance9", "instance0", "invalidInstance1");
+
+ response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER3).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+ stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(),
STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES - 2);
+ // If not setting notExceedMaxOfflineInstances=true
+ Assert.assertTrue(stoppableSet.contains("instance3"));
+
+ nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // Before: "StoppableTestCluster3_db_0_8" : {"instance12" :
"MASTER","instance4" : "SLAVE", "instance9" : "SLAVE"},
+ // After: "StoppableTestCluster3_db_0_8" : {"instance12" :
"MASTER","instance4" : "SLAVE"},
+ // Since instance9 is not live, instance4 is no longer stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods =
"testInstancesStoppableWithOfflineInstancesInTopology")
+ public void testInstanceStoppableCrossZoneWithMaxOfflineCheckViolated()
throws Exception {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(STOPPABLE_CLUSTER2);
+ clusterConfig.setMaxOfflineInstancesAllowed(0);
+ _configAccessor.setClusterConfig(STOPPABLE_CLUSTER2, clusterConfig);
+ TestHelper.verify(() -> {
+ return
_configAccessor.getClusterConfig(STOPPABLE_CLUSTER2).getMaxOfflineInstancesAllowed()
+ == 0;
+ }, 1000);
+
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance0", "instance6", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ // Since the maxOfflineAllowed is 0, no node is stoppable.
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(), 0);
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance4"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+ // restore the config
+
clusterConfig.setMaxOfflineInstancesAllowed(STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
+ _configAccessor.setClusterConfig(STOPPABLE_CLUSTER2, clusterConfig);
+ TestHelper.verify(() -> {
+ return
_configAccessor.getClusterConfig(STOPPABLE_CLUSTER2).getMaxOfflineInstancesAllowed()
+ == STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES;
+ }, 1000);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods =
"testInstanceStoppableCrossZoneWithMaxOfflineCheckViolated")
+ public void testInstanceStoppableZoneBasedWithExceedingMaxOfflineInstances()
throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(),
STOPPABLE_CLUSTER_MAX_OFFLINE_INSTANCES);
+ // If not setting notExceedMaxOfflineInstances=true, instance5 will be
included in tbe result.
+ Assert.assertTrue(stoppableSet.contains("instance4") &&
stoppableSet.contains("instance3") && stoppableSet.contains("instance5"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+
+ content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance0", "instance6", "invalidInstance1");
+
+ response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(), 1);
+ // If not setting notExceedMaxOfflineInstances=true, instance5 will be
included in tbe result.
+ Assert.assertTrue(stoppableSet.contains("instance4"));
+
+ nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods =
"testInstanceStoppableZoneBasedWithExceedingMaxOfflineInstances")
+ public void testCrossZoneStoppableWithExceedingMaxOfflineInstances() throws
IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"],
\"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+ "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+ InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1",
+ InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
"instance0", "instance6", "invalidInstance1");
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ + "¬ExceedMaxOfflineInstances=true").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(), 1);
+ // If not setting notExceedMaxOfflineInstances=true, instance5 will be
included in tbe result.
+ Assert.assertTrue(stoppableSet.contains("instance4"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+
+ response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK"
+ +
"¬ExceedMaxOfflineInstances=true&continueOnFailures=true").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertEquals(stoppableSet.size(), 1);
+ // If not setting notExceedMaxOfflineInstances=true, instance5 will be
included in tbe result.
+ Assert.assertTrue(stoppableSet.contains("instance4"));
+
+ nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER",
"instance13" : "SLAVE", "instance5" : "SLAVE"}.
+ // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2,
instance5 is not stoppable.
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance3"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance1"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES",
"HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+ ImmutableSet.of("HELIX:EXCEED_MAX_OFFLINE_INSTANCES"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods =
"testCrossZoneStoppableWithExceedingMaxOfflineInstances")
public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws
IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());