This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f81222c56 CUSTOM_INSTANCE_CHECK and(or) CUSTOM_PARTITION_CHECK
optional for stoppable APIs (#2641)
f81222c56 is described below
commit f81222c564ecdd61e595f5b619106efe0f66528e
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Oct 6 13:46:13 2023 -0700
CUSTOM_INSTANCE_CHECK and(or) CUSTOM_PARTITION_CHECK optional for stoppable
APIs (#2641)
Allow for callers to helix-rest stoppable APIs to skip running
CUSTOM_INSTANCE_CHECK and(or) CUSTOM_PARTITION_CHECK, as they may not have the
need to implement additional custom checks.
---
.../MaintenanceManagementService.java | 73 +++++++----
.../rest/server/json/instance/StoppableCheck.java | 8 ++
.../server/resources/helix/InstancesAccessor.java | 134 ++++++++++++---------
.../resources/helix/PerInstanceAccessor.java | 33 +++--
.../TestMaintenanceManagementService.java | 123 ++++++++++++++++---
.../helix/rest/server/TestInstancesAccessor.java | 6 +-
.../helix/rest/server/TestPerInstanceAccessor.java | 9 +-
7 files changed, 275 insertions(+), 111 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 657619c9e..529fc469d 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -42,6 +42,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -81,43 +82,65 @@ public class MaintenanceManagementService {
public static final String HELIX_CUSTOM_STOPPABLE_CHECK =
"CustomInstanceStoppableCheck";
public static final String OPERATION_CONFIG_SHARED_INPUT =
"OperationConfigSharedInput";
+ public static final Set<StoppableCheck.Category>
SKIPPABLE_HEALTH_CHECK_CATEGORIES =
+ ImmutableSet.of(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK,
+ StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+
private final ConfigAccessor _configAccessor;
private final CustomRestClient _customRestClient;
private final String _namespace;
private final boolean _skipZKRead;
private final HelixDataAccessorWrapper _dataAccessor;
private final Set<String> _nonBlockingHealthChecks;
+ private final Set<StoppableCheck.Category> _skipHealthCheckCategories;
public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead,
+ this(new HelixDataAccessorWrapper(dataAccessor,
CustomRestClientFactory.get(), namespace),
+ configAccessor, CustomRestClientFactory.get(), skipZKRead,
Collections.emptySet(),
Collections.emptySet(), namespace);
}
public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, Set<String>
nonBlockingHealthChecks,
String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead,
- nonBlockingHealthChecks, namespace);
+ this(new HelixDataAccessorWrapper(dataAccessor,
CustomRestClientFactory.get(), namespace),
+ configAccessor, CustomRestClientFactory.get(), skipZKRead,
nonBlockingHealthChecks,
+ Collections.emptySet(), namespace);
}
public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, boolean
continueOnFailure,
String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead,
+ this(new HelixDataAccessorWrapper(dataAccessor,
CustomRestClientFactory.get(), namespace),
+ configAccessor, CustomRestClientFactory.get(), skipZKRead,
+ continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+ : Collections.emptySet(), Collections.emptySet(), namespace);
+ }
+
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, boolean skipZKRead, boolean
continueOnFailure,
+ Set<StoppableCheck.Category> skipHealthCheckCategories, String
namespace) {
+ this(new HelixDataAccessorWrapper(dataAccessor,
CustomRestClientFactory.get(), namespace),
+ configAccessor, CustomRestClientFactory.get(), skipZKRead,
continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
- : Collections.emptySet(), namespace);
+ : Collections.emptySet(),
+ skipHealthCheckCategories != null ? skipHealthCheckCategories :
Collections.emptySet(),
+ namespace);
}
@VisibleForTesting
- MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, Set<String>
nonBlockingHealthChecks,
+ MaintenanceManagementService(HelixDataAccessorWrapper dataAccessorWrapper,
+ ConfigAccessor configAccessor, CustomRestClient customRestClient,
boolean skipZKRead,
+ Set<String> nonBlockingHealthChecks, Set<StoppableCheck.Category>
skipHealthCheckCategories,
String namespace) {
- _dataAccessor = new HelixDataAccessorWrapper(dataAccessor,
customRestClient, namespace);
+ _dataAccessor = dataAccessorWrapper;
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
_nonBlockingHealthChecks = nonBlockingHealthChecks;
+ _skipHealthCheckCategories =
+ skipHealthCheckCategories != null ? skipHealthCheckCategories :
Collections.emptySet();
_namespace = namespace;
}
@@ -440,20 +463,27 @@ public class MaintenanceManagementService {
LOG.error(errorMessage);
throw new HelixException(errorMessage);
}
- Map<String, Future<StoppableCheck>> customInstanceLevelChecks =
instances.stream().collect(
- Collectors.toMap(Function.identity(), instance -> POOL.submit(
- () -> performCustomInstanceCheck(clusterId, instance,
restConfig.getBaseUrl(instance),
- customPayLoads))));
- List<String> instancesForCustomPartitionLevelChecks =
- filterInstancesForNextCheck(customInstanceLevelChecks,
finalStoppableChecks);
- if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
+
+ List<String> instancesForCustomPartitionLevelChecks;
+ if
(!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK))
{
+ Map<String, Future<StoppableCheck>> customInstanceLevelChecks =
instances.stream().collect(
+ Collectors.toMap(Function.identity(), instance -> POOL.submit(
+ () -> performCustomInstanceCheck(clusterId, instance,
restConfig.getBaseUrl(instance),
+ customPayLoads))));
+ instancesForCustomPartitionLevelChecks =
+ filterInstancesForNextCheck(customInstanceLevelChecks,
finalStoppableChecks);
+ } else {
+ instancesForCustomPartitionLevelChecks = instances;
+ }
+
+ if (!instancesForCustomPartitionLevelChecks.isEmpty() &&
!_skipHealthCheckCategories.contains(
+ StoppableCheck.Category.CUSTOM_PARTITION_CHECK)) {
// add to finalStoppableChecks regardless of stoppable or not.
Map<String, StoppableCheck> instancePartitionLevelChecks =
performPartitionsCheck(instancesForCustomPartitionLevelChecks,
restConfig,
customPayLoads);
List<String> instancesForFollowingChecks = new ArrayList<>();
- for (Map.Entry<String, StoppableCheck>
instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
- .entrySet()) {
+ for (Map.Entry<String, StoppableCheck>
instancePartitionStoppableCheckEntry : instancePartitionLevelChecks.entrySet())
{
String instance = instancePartitionStoppableCheckEntry.getKey();
StoppableCheck stoppableCheck =
instancePartitionStoppableCheckEntry.getValue();
addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
@@ -465,6 +495,8 @@ public class MaintenanceManagementService {
}
return instancesForFollowingChecks;
}
+
+ // This means that we skipped
return instancesForCustomPartitionLevelChecks;
}
@@ -474,7 +506,7 @@ public class MaintenanceManagementService {
Map<String, MaintenanceManagementInstanceInfo> instanceInfos = new
HashMap<>();
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// TODO: Right now user can only choose from HelixInstanceStoppableCheck
and
- // CostumeInstanceStoppableCheck. We should add finer grain check groups
to choose from
+ // CustomInstanceStoppableCheck. We should add finer grain check groups to
choose from
// i.e. HELIX:INSTANCE_NOT_ENABLED,
CUSTOM_PARTITION_HEALTH_FAILURE:PARTITION_INITIAL_STATE_FAIL etc.
for (String healthCheck : healthChecks) {
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
@@ -528,10 +560,7 @@ public class MaintenanceManagementService {
String instance = entry.getKey();
try {
StoppableCheck stoppableCheck = entry.getValue().get();
- if (!stoppableCheck.isStoppable()) {
- // put the check result of the failed-to-stop instances
- addStoppableCheck(finalStoppableCheckByInstance, instance,
stoppableCheck);
- }
+ addStoppableCheck(finalStoppableCheckByInstance, instance,
stoppableCheck);
if (stoppableCheck.isStoppable() ||
isNonBlockingCheck(stoppableCheck)) {
// instance passed this around of check or mandatory all checks
// will be checked in the next round
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index 150e87be4..2985bd86f 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -19,8 +19,10 @@ package org.apache.helix.rest.server.json.instance;
* under the License.
*/
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -42,6 +44,12 @@ public class StoppableCheck {
public String getPrefix() {
return prefix;
}
+
+ public static Set<Category> categorySetFromCommaSeperatedString(String
categories)
+ throws IllegalArgumentException {
+ return
Arrays.stream(categories.split(",")).map(StoppableCheck.Category::valueOf)
+ .collect(Collectors.toSet());
+ }
}
@JsonProperty("stoppable")
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index d397ada0f..87be72b96 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -44,7 +44,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
-import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
@@ -150,12 +149,11 @@ public class InstancesAccessor extends
AbstractHelixResource {
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
- public Response instancesOperations(
- @PathParam("clusterId") String clusterId,
+ public Response instancesOperations(@PathParam("clusterId") String clusterId,
@QueryParam("command") String command,
@QueryParam("continueOnFailures") boolean continueOnFailures,
@QueryParam("skipZKRead") boolean skipZKRead,
- String content) {
+ @QueryParam("skipHealthCheckCategories") String
skipHealthCheckCategories, String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
@@ -163,6 +161,22 @@ public class InstancesAccessor extends
AbstractHelixResource {
return badRequest("Invalid command : " + command);
}
+ Set<StoppableCheck.Category> skipHealthCheckCategorySet;
+ try {
+ skipHealthCheckCategorySet = skipHealthCheckCategories != null
+ ?
StoppableCheck.Category.categorySetFromCommaSeperatedString(skipHealthCheckCategories)
+ : Collections.emptySet();
+ if
(!MaintenanceManagementService.SKIPPABLE_HEALTH_CHECK_CATEGORIES.containsAll(
+ skipHealthCheckCategorySet)) {
+ throw new IllegalArgumentException(
+ "Some of the provided skipHealthCheckCategories are not skippable.
The supported skippable categories are: "
+ +
MaintenanceManagementService.SKIPPABLE_HEALTH_CHECK_CATEGORIES);
+ }
+ } catch (Exception e) {
+ return badRequest("Invalid skipHealthCheckCategories: " +
skipHealthCheckCategories + "\n"
+ + e.getMessage());
+ }
+
HelixAdmin admin = getHelixAdmin();
try {
JsonNode node = null;
@@ -176,17 +190,18 @@ public class InstancesAccessor extends
AbstractHelixResource {
.readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
switch (cmd) {
- case enable:
- admin.enableInstance(clusterId, enableInstances, true);
- break;
- case disable:
- admin.enableInstance(clusterId, enableInstances, false);
- break;
- case stoppable:
- return batchGetStoppableInstances(clusterId, node, skipZKRead,
continueOnFailures);
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
+ case enable:
+ admin.enableInstance(clusterId, enableInstances, true);
+ break;
+ case disable:
+ admin.enableInstance(clusterId, enableInstances, false);
+ break;
+ case stoppable:
+ return batchGetStoppableInstances(clusterId, node, skipZKRead,
continueOnFailures,
+ skipHealthCheckCategorySet);
+ default:
+ _logger.error("Unsupported command :" + command);
+ return badRequest("Unsupported command :" + command);
}
} catch (HelixHealthException e) {
_logger
@@ -200,26 +215,28 @@ public class InstancesAccessor extends
AbstractHelixResource {
}
private Response batchGetStoppableInstances(String clusterId, JsonNode node,
boolean skipZKRead,
- boolean continueOnFailures) throws IOException {
+ boolean continueOnFailures, Set<StoppableCheck.Category>
skipHealthCheckCategories)
+ throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
InstancesAccessor.InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());
- List<String> instances = OBJECT_MAPPER
-
.readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
+ List<String> instances = OBJECT_MAPPER.readValue(
+
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
List<String> orderOfZone = null;
String customizedInput = null;
if
(node.get(InstancesAccessor.InstancesProperties.customized_values.name()) !=
null) {
- customizedInput =
node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
+ customizedInput =
+
node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
}
if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) !=
null) {
- orderOfZone = OBJECT_MAPPER
-
.readValue(node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
-
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
+ orderOfZone = OBJECT_MAPPER.readValue(
+
node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class,
String.class));
}
// Prepare output result
@@ -231,46 +248,49 @@ public class InstancesAccessor extends
AbstractHelixResource {
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService((ZKHelixDataAccessor)
getDataAccssor(clusterId),
- getConfigAccessor(), skipZKRead, continueOnFailures,
getNamespace());
- ClusterService clusterService = new
ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+ getConfigAccessor(), skipZKRead, continueOnFailures,
skipHealthCheckCategories,
+ getNamespace());
+ ClusterService clusterService =
+ new ClusterServiceImpl(getDataAccssor(clusterId),
getConfigAccessor());
ClusterTopology clusterTopology =
clusterService.getClusterTopology(clusterId);
switch (selectionBase) {
- case zone_based:
- List<String> zoneBasedInstance =
- getZoneBasedInstances(instances, orderOfZone,
clusterTopology.toZoneMapping());
- Map<String, StoppableCheck> instancesStoppableChecks =
maintenanceService.batchGetInstancesStoppableChecks(
- clusterId, zoneBasedInstance, customizedInput);
- for (Map.Entry<String, StoppableCheck> instanceStoppableCheck :
instancesStoppableChecks.entrySet()) {
- String instance = instanceStoppableCheck.getKey();
- StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
- if (!stoppableCheck.isStoppable()) {
- ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(instance);
- for (String failedReason : stoppableCheck.getFailedChecks()) {
-
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
+ case zone_based:
+ List<String> zoneBasedInstance =
+ getZoneBasedInstances(instances, orderOfZone,
clusterTopology.toZoneMapping());
+ Map<String, StoppableCheck> instancesStoppableChecks =
+ maintenanceService.batchGetInstancesStoppableChecks(clusterId,
zoneBasedInstance,
+ customizedInput);
+ for (Map.Entry<String, StoppableCheck> instanceStoppableCheck :
instancesStoppableChecks.entrySet()) {
+ String instance = instanceStoppableCheck.getKey();
+ StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
+ if (!stoppableCheck.isStoppable()) {
+ ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(instance);
+ for (String failedReason : stoppableCheck.getFailedChecks()) {
+
failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
+ }
+ } else {
+ stoppableInstances.add(instance);
}
- } else {
- stoppableInstances.add(instance);
}
- }
- // Adding following logic to check whether instances exist or not. An
instance exist could be
- // checking following scenario:
- // 1. Instance got dropped. (InstanceConfig is gone.)
- // 2. Instance name has typo.
+ // Adding following logic to check whether instances exist or not.
An instance exist could be
+ // checking following scenario:
+ // 1. Instance got dropped. (InstanceConfig is gone.)
+ // 2. Instance name has typo.
- // If we dont add this check, the instance, which does not exist, will
be disappeared from
- // result since Helix skips instances for instances not in the
selected zone. User may get
- // confused with the output.
- Set<String> nonSelectedInstances = new HashSet<>(instances);
- nonSelectedInstances.removeAll(clusterTopology.getAllInstances());
- for (String nonSelectedInstance : nonSelectedInstances) {
- ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(nonSelectedInstance);
-
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
- }
+ // If we dont add this check, the instance, which does not exist,
will be disappeared from
+ // result since Helix skips instances for instances not in the
selected zone. User may get
+ // confused with the output.
+ Set<String> nonSelectedInstances = new HashSet<>(instances);
+ nonSelectedInstances.removeAll(clusterTopology.getAllInstances());
+ for (String nonSelectedInstance : nonSelectedInstances) {
+ ArrayNode failedReasonsNode =
failedStoppableInstances.putArray(nonSelectedInstance);
+
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
+ }
- break;
- case instance_based:
- default:
- throw new UnsupportedOperationException("instance_based selection is
not supported yet!");
+ break;
+ case instance_based:
+ default:
+ throw new UnsupportedOperationException("instance_based selection is
not supported yet!");
}
return JSONRepresentation(result);
} catch (HelixException e) {
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 8fcd2ab35..efc3ce652 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -164,6 +164,7 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
* @param continueOnFailures whether or not continue to perform the
subsequent checks if previous
* check fails. If false, when helix own check
fails, the subsequent
* custom checks will not be performed.
+ * @param skipHealthCheckCategories StoppableCheck Categories to skip.
* @return json response representing if queried instance is stoppable
* @throws IOException if there is any IO/network error
*/
@@ -172,16 +173,32 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
@POST
@Path("stoppable")
@Consumes(MediaType.APPLICATION_JSON)
- public Response isInstanceStoppable(
- String jsonContent,
- @PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
- @QueryParam("skipZKRead") boolean skipZKRead,
- @QueryParam("continueOnFailures") boolean continueOnFailures) throws
IOException {
+ public Response isInstanceStoppable(String jsonContent,
@PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName,
@QueryParam("skipZKRead") boolean skipZKRead,
+ @QueryParam("continueOnFailures") boolean continueOnFailures,
+ @QueryParam("skipHealthCheckCategories") String
skipHealthCheckCategories)
+ throws IOException {
+
+ Set<StoppableCheck.Category> skipHealthCheckCategorySet;
+ try {
+ skipHealthCheckCategorySet = skipHealthCheckCategories != null
+ ?
StoppableCheck.Category.categorySetFromCommaSeperatedString(skipHealthCheckCategories)
+ : Collections.emptySet();
+ if
(!MaintenanceManagementService.SKIPPABLE_HEALTH_CHECK_CATEGORIES.containsAll(
+ skipHealthCheckCategorySet)) {
+ throw new IllegalArgumentException(
+ "Some of the provided skipHealthCheckCategories are not skippable.
The supported skippable categories are: "
+ +
MaintenanceManagementService.SKIPPABLE_HEALTH_CHECK_CATEGORIES);
+ }
+ } catch (Exception e) {
+ return badRequest("Invalid skipHealthCheckCategories: " +
skipHealthCheckCategories + "\n"
+ + e.getMessage());
+ }
+
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
MaintenanceManagementService maintenanceService =
- new MaintenanceManagementService((ZKHelixDataAccessor) dataAccessor,
getConfigAccessor(), skipZKRead,
- continueOnFailures, getNamespace());
+ new MaintenanceManagementService((ZKHelixDataAccessor) dataAccessor,
getConfigAccessor(),
+ skipZKRead, continueOnFailures, skipHealthCheckCategorySet,
getNamespace());
StoppableCheck stoppableCheck;
try {
JsonNode node = null;
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 87883d7fa..f8408b070 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,10 +34,12 @@ import com.google.common.collect.ImmutableSet;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.rest.client.CustomRestClient;
@@ -51,6 +54,7 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
@@ -62,17 +66,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-
public class TestMaintenanceManagementService {
private static final String TEST_CLUSTER = "TestCluster";
private static final String TEST_INSTANCE = "instance0.linkedin.com_1235";
@Mock
- private HelixDataAccessorWrapper _dataAccessor;
+ private HelixDataAccessorWrapper _dataAccessorWrapper;
@Mock
private ConfigAccessor _configAccessor;
@Mock
private CustomRestClient _customRestClient;
+
@BeforeMethod
public void beforeMethod() {
MockitoAnnotations.initMocks(this);
@@ -86,16 +90,26 @@ public class TestMaintenanceManagementService {
public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient,
boolean skipZKRead,
boolean continueOnFailure, String namespace) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead,
+ super(new HelixDataAccessorWrapper(dataAccessor, customRestClient,
namespace), configAccessor,
+ customRestClient, skipZKRead,
continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
- : Collections.emptySet(), namespace);
+ : Collections.emptySet(), null, namespace);
+ }
+
+ public MockMaintenanceManagementService(HelixDataAccessorWrapper
dataAccessorWrapper,
+ ConfigAccessor configAccessor, CustomRestClient customRestClient,
boolean skipZKRead,
+ boolean continueOnFailure, Set<StoppableCheck.Category>
skipHealthCheckCategories,
+ String namespace) {
+ super(dataAccessorWrapper, configAccessor, customRestClient, skipZKRead,
+ continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+ : Collections.emptySet(), skipHealthCheckCategories, namespace);
}
public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient,
boolean skipZKRead,
Set<String> nonBlockingHealthChecks, String namespace) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead,
nonBlockingHealthChecks,
- namespace);
+ super(new HelixDataAccessorWrapper(dataAccessor, customRestClient,
namespace), configAccessor,
+ customRestClient, skipZKRead, nonBlockingHealthChecks, null,
namespace);
}
@Override
@@ -105,13 +119,12 @@ public class TestMaintenanceManagementService {
}
}
-
@Test
public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws
IOException {
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
MockMaintenanceManagementService service =
- new MockMaintenanceManagementService(_dataAccessor, _configAccessor,
_customRestClient,
- false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
String instanceName, List<HealthCheck> healthChecks) {
@@ -130,8 +143,8 @@ public class TestMaintenanceManagementService {
@Test
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail()
throws IOException {
MockMaintenanceManagementService service =
- new MockMaintenanceManagementService(_dataAccessor, _configAccessor,
_customRestClient, false, false,
- HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
String instanceName, List<HealthCheck> healthChecks) {
@@ -139,8 +152,8 @@ public class TestMaintenanceManagementService {
}
};
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
- when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
- .thenReturn(failedCheck);
+ when(_customRestClient.getInstanceStoppableCheck(anyString(),
anyMap())).thenReturn(
+ failedCheck);
String jsonContent = "{\n" + " \"param1\": \"value1\",\n" + "\"param2\":
\"value2\"\n" + "}";
StoppableCheck actual =
service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE,
jsonContent);
@@ -150,11 +163,87 @@ public class TestMaintenanceManagementService {
verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(),
any(), any());
}
+ @Test
+ public void
testGetInstanceStoppableCheckWhenCustomInstanceCheckAndCustomPartitionCheckDisabled()
+ throws IOException {
+ // Test when custom instance and partition check are both disabled.
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false, new HashSet<>(
+ Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK,
+ StoppableCheck.Category.CUSTOM_PARTITION_CHECK)),
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+ StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, "");
+ Assert.assertTrue(actual.isStoppable());
+ verify(_dataAccessorWrapper,
times(0)).getAllPartitionsHealthOnLiveInstance(any(), any(),
+ anyBoolean());
+ verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(),
any());
+ }
+
+ @Test
+ public void testGetInstanceStoppableCheckWhenCustomPartitionCheckDisabled()
throws IOException {
+ // Test when custom only partition check is disabled and instance check
fails.
+ when(_dataAccessorWrapper.getAllPartitionsHealthOnLiveInstance(any(),
anyMap(),
+ anyBoolean())).thenReturn(Collections.emptyMap());
+ when(_dataAccessorWrapper.getChildValues(any(), anyBoolean())).thenReturn(
+ Collections.emptyList());
+ when(_customRestClient.getInstanceStoppableCheck(anyString(),
anyMap())).thenReturn(
+ ImmutableMap.of("FailCheck", false));
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
+ new
HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_PARTITION_CHECK)),
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+ StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, "");
+ Assert.assertEquals(actual.getFailedChecks(),
+
Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK.getPrefix() +
"FailCheck"));
+ Assert.assertFalse(actual.isStoppable());
+ verify(_dataAccessorWrapper,
times(0)).getAllPartitionsHealthOnLiveInstance(any(), any(),
+ anyBoolean());
+ verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(),
any());
+ }
+
+ @Test
+ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckDisabled()
throws IOException {
+ // Test when custom only instance check is disabled and partition check
fails.
+ String testResource = "testResource";
+ ZNRecord externalViewZnode = new ZNRecord(testResource);
+ externalViewZnode.setSimpleField(
+ ExternalView.ExternalViewProperty.STATE_MODEL_DEF_REF.toString(),
LeaderStandbySMD.name);
+ ExternalView externalView = new ExternalView(externalViewZnode);
+ externalView.setStateMap("testPartition",
+ ImmutableMap.of(TEST_INSTANCE, "LEADER", "sibling_instance",
"OFFLINE"));
+
+ when(_dataAccessorWrapper.getAllPartitionsHealthOnLiveInstance(any(),
anyMap(),
+ anyBoolean())).thenReturn(Collections.emptyMap());
+ when(_dataAccessorWrapper.getProperty((PropertyKey) any())).thenReturn(new
LeaderStandbySMD());
+ when(_dataAccessorWrapper.keyBuilder()).thenReturn(new
PropertyKey.Builder(TEST_CLUSTER));
+ when(_dataAccessorWrapper.getChildValues(any(), anyBoolean())).thenReturn(
+ Arrays.asList(externalView));
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
+ new
HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)),
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+ StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, "");
+ List<String> expectedFailedChecks = Arrays.asList(
+ StoppableCheck.Category.CUSTOM_PARTITION_CHECK.getPrefix()
+ + "PARTITION_INITIAL_STATE_FAIL:testPartition");
+ Assert.assertEquals(actual.getFailedChecks(), expectedFailedChecks);
+ Assert.assertFalse(actual.isStoppable());
+ verify(_dataAccessorWrapper,
times(1)).getAllPartitionsHealthOnLiveInstance(any(), any(),
+ anyBoolean());
+ verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(),
any());
+ }
+
@Test
public void testGetInstanceStoppableCheckConnectionRefused() throws
IOException {
MockMaintenanceManagementService service =
- new MockMaintenanceManagementService(_dataAccessor, _configAccessor,
_customRestClient, false, false,
- HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
String instanceName, List<HealthCheck> healthChecks) {
@@ -300,8 +389,8 @@ public class TestMaintenanceManagementService {
@Test(enabled = false)
public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws
IOException {
MockMaintenanceManagementService service =
- new MockMaintenanceManagementService(_dataAccessor, _configAccessor,
_customRestClient, false, false,
- HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
String instanceName, List<HealthCheck> healthChecks) {
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index fa059ad30..01701a486 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -56,9 +56,9 @@ public class TestInstancesAccessor extends AbstractTestClass {
InstancesAccessor.InstancesProperties.instances.name(), "instance0",
"instance1",
"instance2", "instance3", "instance4", "instance5", "invalidInstance",
InstancesAccessor.InstancesProperties.zone_order.name(), "zone2",
"zone1");
- Response response =
- new
JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable").format(
- STOPPABLE_CLUSTER).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
Assert.assertFalse(
jsonNode.withArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name())
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 35f0712b4..273019bd3 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -62,12 +62,13 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
Map<String, String> params = ImmutableMap.of("client", "espresso");
Entity entity =
Entity.entity(OBJECT_MAPPER.writeValueAsString(params),
MediaType.APPLICATION_JSON_TYPE);
- Response response = new
JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
- .format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances/{}/stoppable?skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+ STOPPABLE_CLUSTER, "instance1").post(this, entity);
String stoppableCheckResult = response.readEntity(String.class);
Map<String, Object> actualMap =
OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
- List<String> failedChecks = Arrays
- .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
"HELIX:INSTANCE_NOT_ENABLED",
+ List<String> failedChecks =
+ Arrays.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
"HELIX:INSTANCE_NOT_ENABLED",
"HELIX:INSTANCE_NOT_STABLE");
Map<String, Object> expectedMap =
ImmutableMap.of("stoppable", false, "failedChecks", failedChecks);