This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 35b5ec1 Add option to continue checks on failures for stoppable api
(#1689)
35b5ec1 is described below
commit 35b5ec15c6fa0839a4f1e34c34ca798f151d3f15
Author: Huizhi Lu <[email protected]>
AuthorDate: Wed Apr 14 19:49:54 2021 -0700
Add option to continue checks on failures for stoppable api (#1689)
This commit provides an option in Helix stoppable check API, such that when
the option is used, Helix will always perform all checks and return all failed
checks. Query param: continueOnFailures.
---
.../rest/server/json/instance/StoppableCheck.java | 5 ++
.../server/resources/helix/InstancesAccessor.java | 15 +++--
.../resources/helix/PerInstanceAccessor.java | 38 ++++++++----
.../rest/server/service/InstanceServiceImpl.java | 43 ++++++++++----
.../helix/rest/server/TestInstancesAccessor.java | 2 -
.../helix/rest/server/TestPerInstanceAccessor.java | 9 ++-
.../rest/server/service/TestInstanceService.java | 68 ++++++++++++++++++----
7 files changed, 138 insertions(+), 42 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index ee554c1..150e87b 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -78,4 +78,9 @@ public class StoppableCheck {
public List<String> getFailedChecks() {
return failedChecks;
}
+
+ public void add(StoppableCheck other) {
+ failedChecks.addAll(other.getFailedChecks());
+ isStoppable = failedChecks.isEmpty();
+ }
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index bacae0d..233cc54 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -148,8 +148,12 @@ public class InstancesAccessor extends
AbstractHelixResource {
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
- public Response instancesOperations(@PathParam("clusterId") String clusterId,
- @QueryParam("skipZKRead") String skipZKRead, @QueryParam("command")
String command, String content) {
+ public Response instancesOperations(
+ @PathParam("clusterId") String clusterId,
+ @QueryParam("command") String command,
+ @QueryParam("continueOnFailures") boolean continueOnFailures,
+ @QueryParam("skipZKRead") boolean skipZKRead,
+ String content) {
Command cmd;
try {
cmd = Command.valueOf(command);
@@ -177,7 +181,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
admin.enableInstance(clusterId, enableInstances, false);
break;
case stoppable:
- return batchGetStoppableInstances(clusterId, node,
Boolean.valueOf(skipZKRead));
+ return batchGetStoppableInstances(clusterId, node, skipZKRead,
continueOnFailures);
default:
_logger.error("Unsupported command :" + command);
return badRequest("Unsupported command :" + command);
@@ -193,7 +197,8 @@ public class InstancesAccessor extends
AbstractHelixResource {
return OK();
}
- private Response batchGetStoppableInstances(String clusterId, JsonNode node,
boolean skipZKRead) throws IOException {
+ private Response batchGetStoppableInstances(String clusterId, JsonNode node,
boolean skipZKRead,
+ boolean continueOnFailures) throws IOException {
try {
// TODO: Process input data from the content
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
@@ -223,7 +228,7 @@ public class InstancesAccessor extends
AbstractHelixResource {
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
InstanceService instanceService =
new InstanceServiceImpl((ZKHelixDataAccessor)
getDataAccssor(clusterId),
- getConfigAccessor(), skipZKRead, getNamespace());
+ getConfigAccessor(), skipZKRead, continueOnFailures,
getNamespace());
ClusterService clusterService = new
ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology =
clusterService.getClusterTopology(clusterId);
switch (selectionBase) {
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 1560d74..d588f54 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -39,7 +39,6 @@ import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -101,7 +100,6 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
switch (cmd) {
case getInstance:
- ObjectMapper objectMapper = new ObjectMapper();
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
// TODO reduce GC by dependency injection
InstanceService instanceService =
@@ -111,7 +109,7 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
String instanceInfoString;
try {
- instanceInfoString = objectMapper.writeValueAsString(instanceInfo);
+ instanceInfoString = OBJECT_MAPPER.writeValueAsString(instanceInfo);
} catch (JsonProcessingException e) {
return serverError(e);
}
@@ -133,28 +131,44 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
}
}
+ /**
+ * Performs health checks for an instance to answer if it is stoppable.
+ *
+ * @param jsonContent json payload
+ * @param clusterId cluster id
+ * @param instanceName Instance name to be checked
+ * @param skipZKRead skip reading from zk server
+ * @param continueOnFailures whether or not continue to perform the
subsequent checks if previous
+ * check fails. If false, when helix own check
fails, the subsequent
+ * custom checks will not be performed.
+ * @return json response representing if queried instance is stoppable
+ * @throws IOException if there is any IO/network error
+ */
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@POST
@Path("stoppable")
@Consumes(MediaType.APPLICATION_JSON)
- public Response isInstanceStoppable(String jsonContent,
@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
@QueryParam("skipZKRead") String skipZKRead) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
+ public Response isInstanceStoppable(
+ String jsonContent,
+ @PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName,
+ @QueryParam("skipZKRead") boolean skipZKRead,
+ @QueryParam("continueOnFailures") boolean continueOnFailures) throws
IOException {
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
InstanceService instanceService =
- new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor,
getConfigAccessor(),
- Boolean.parseBoolean(skipZKRead), getNamespace());
- StoppableCheck stoppableCheck = null;
+ new InstanceServiceImpl((ZKHelixDataAccessor) dataAccessor,
getConfigAccessor(), skipZKRead,
+ continueOnFailures, getNamespace());
+ StoppableCheck stoppableCheck;
try {
stoppableCheck =
instanceService.getInstanceStoppableCheck(clusterId, instanceName,
jsonContent);
} catch (HelixException e) {
- LOG.error(String.format("Current cluster %s has issue with health
checks!", clusterId),
- e);
+ LOG.error("Current cluster: {}, instance: {} has issue with health
checks!", clusterId,
+ instanceName, e);
return serverError(e);
}
- return OK(objectMapper.writeValueAsString(stoppableCheck));
+ return OK(OBJECT_MAPPER.writeValueAsString(stoppableCheck));
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index de280b0..6cc5592 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -72,8 +72,9 @@ public class InstanceServiceImpl implements InstanceService {
private final HelixDataAccessorWrapper _dataAccessor;
private final ConfigAccessor _configAccessor;
private final CustomRestClient _customRestClient;
- private String _namespace;
- private boolean _skipZKRead;
+ private final String _namespace;
+ private final boolean _skipZKRead;
+ private final boolean _continueOnFailures;
@Deprecated
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor
configAccessor) {
@@ -88,16 +89,25 @@ public class InstanceServiceImpl implements InstanceService
{
public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor
configAccessor,
boolean skipZKRead, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead, namespace);
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead, false, namespace);
+ }
+
+ // TODO: too many params, convert to builder pattern
+ public InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor
configAccessor,
+ boolean skipZKRead, boolean continueOnFailures, String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(),
skipZKRead,
+ continueOnFailures, namespace);
}
@VisibleForTesting
InstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor
configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, String namespace)
{
+ CustomRestClient customRestClient, boolean skipZKRead, boolean
continueOnFailures,
+ String namespace) {
_dataAccessor = new HelixDataAccessorWrapper(dataAccessor,
customRestClient, namespace);
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
+ _continueOnFailures = continueOnFailures;
_namespace = namespace;
}
@@ -181,7 +191,7 @@ public class InstanceServiceImpl implements InstanceService
{
List<String> instancesForCustomInstanceLevelChecks =
filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
if (instancesForCustomInstanceLevelChecks.isEmpty()) {
- // if all instances failed at helix custom level checks
+ // if all instances failed at helix custom level checks and all checks
are not required
return finalStoppableChecks;
}
@@ -206,14 +216,25 @@ public class InstanceServiceImpl implements
InstanceService {
instancesForCustomPartitionLevelChecks, restConfig, customPayLoads);
for (Map.Entry<String, StoppableCheck>
instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
.entrySet()) {
- finalStoppableChecks.put(instancePartitionStoppableCheckEntry.getKey(),
- instancePartitionStoppableCheckEntry.getValue());
+ String instance = instancePartitionStoppableCheckEntry.getKey();
+ StoppableCheck stoppableCheck =
instancePartitionStoppableCheckEntry.getValue();
+ addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
}
}
return finalStoppableChecks;
}
+ private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks,
String instance,
+ StoppableCheck stoppableCheck) {
+ if (!stoppableChecks.containsKey(instance)) {
+ stoppableChecks.put(instance, stoppableCheck);
+ } else {
+ // Merge two checks
+ stoppableChecks.get(instance).add(stoppableCheck);
+ }
+ }
+
private List<String> filterInstancesForNextCheck(
Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
Map<String, StoppableCheck> finalStoppableCheckByInstance) {
@@ -225,9 +246,11 @@ public class InstanceServiceImpl implements
InstanceService {
StoppableCheck stoppableCheck = entry.getValue().get();
if (!stoppableCheck.isStoppable()) {
// put the check result of the failed-to-stop instances
- finalStoppableCheckByInstance.put(instance, stoppableCheck);
- } else {
- // instance passed this around of check will be checked in the next
round
+ addStoppableCheck(finalStoppableCheckByInstance, instance,
stoppableCheck);
+ }
+ if (stoppableCheck.isStoppable() || _continueOnFailures){
+ // instance passed this around of check or mandatory all checks
+ // will be checked in the next round
instancesForNextCheck.add(instance);
}
} catch (InterruptedException | ExecutionException e) {
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 761ec26..912774b 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -31,7 +31,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.TestHelper;
@@ -45,7 +44,6 @@ import org.testng.annotations.Test;
public class TestInstancesAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_0";
- private ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Test
public void testInstancesStoppable_zoneBased() throws IOException {
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 8809e1c..ecc83dc 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -62,8 +62,13 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
Response response = new
JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
.format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
String stoppableCheckResult = response.readEntity(String.class);
- Assert.assertEquals(stoppableCheckResult,
-
"{\"stoppable\":false,\"failedChecks\":[\"HELIX:EMPTY_RESOURCE_ASSIGNMENT\",\"HELIX:INSTANCE_NOT_ENABLED\",\"HELIX:INSTANCE_NOT_STABLE\"]}");
+
+ Map<String, Object> actualMap =
OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
+ List<String> failedChecks =
Arrays.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
+ "HELIX:INSTANCE_NOT_ENABLED", "HELIX:INSTANCE_NOT_STABLE");
+ Map<String, Object> expectedMap =
+ ImmutableMap.of("stoppable", false, "failedChecks", failedChecks);
+ Assert.assertEquals(actualMap, expectedMap);
System.out.println("End test :" + TestHelper.getTestMethodName());
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
index ad10e82..bb62b13 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
@@ -83,7 +83,7 @@ public class TestInstanceService {
public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws
IOException {
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
@@ -105,7 +105,7 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail()
throws IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
@@ -130,7 +130,7 @@ public class TestInstanceService {
@Test
public void testGetInstanceStoppableCheckConnectionRefused() throws
IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
@@ -211,23 +211,69 @@ public class TestInstanceService {
// Valid data only from ZK, pass the check
InstanceService instanceServiceReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor,
_customRestClient, false);
+ new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor,
_customRestClient, false,
+ false);
StoppableCheck stoppableCheck =
instanceServiceReadZK.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, jsonContent);
Assert.assertTrue(stoppableCheck.isStoppable());
// Even ZK data is valid. Skip ZK read should fail the test.
InstanceService instanceServiceWithoutReadZK =
- new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor,
_customRestClient, true);
+ new MockInstanceServiceImpl(zkHelixDataAccessor, _configAccessor,
_customRestClient, true,
+ false);
stoppableCheck =
instanceServiceWithoutReadZK.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, jsonContent);
Assert.assertFalse(stoppableCheck.isStoppable());
}
+ /*
+ * Tests stoppable check api when all checks query is enabled. After helix
own check fails,
+ * the subsequent checks should be performed.
+ */
+ @Test
+ public void testGetStoppableWithAllChecks() throws IOException {
+ String siblingInstance = "instance0.linkedin.com_1236";
+ BaseDataAccessor<ZNRecord> mockAccessor = mock(ZkBaseDataAccessor.class);
+ ZKHelixDataAccessor zkHelixDataAccessor =
+ new ZKHelixDataAccessor(TEST_CLUSTER, InstanceType.ADMINISTRATOR,
mockAccessor);
+
+
when(mockAccessor.getChildNames(zkHelixDataAccessor.keyBuilder().liveInstances().getPath(),
2))
+ .thenReturn(Arrays.asList(TEST_INSTANCE, siblingInstance));
+
+ Map<String, Boolean> instanceHealthFailedCheck =
ImmutableMap.of("FailCheck", false);
+ InstanceService service =
+ new InstanceServiceImpl(zkHelixDataAccessor, _configAccessor,
_customRestClient, true, true,
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
+ @Override
+ protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
+ String instanceName, List<HealthCheck> healthChecks) {
+ return instanceHealthFailedCheck;
+ }
+ };
+
+ when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap()))
+ .thenReturn(ImmutableMap.of("FailCheck", false));
+ when(_customRestClient.getPartitionStoppableCheck(anyString(), anyList(),
anyMap()))
+ .thenReturn(ImmutableMap.of("FailCheck", false));
+
+ StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER,
TEST_INSTANCE, "");
+ List<String> expectedFailedChecks =
+ Arrays.asList("HELIX:FailCheck",
"CUSTOM_INSTANCE_HEALTH_FAILURE:FailCheck");
+
+ Assert.assertEquals(actual.getFailedChecks(), expectedFailedChecks);
+ Assert.assertFalse(actual.isStoppable());
+
+ // Verify the subsequent checks are called
+ verify(_configAccessor, times(1)).getRESTConfig(anyString());
+ verify(_customRestClient, times(1)).getInstanceStoppableCheck(anyString(),
anyMap());
+ verify(_customRestClient, times(2))
+ .getPartitionStoppableCheck(anyString(), anyList(), anyMap());
+ }
+
// TODO re-enable the test when partition health checks get decoupled
@Test(enabled = false)
public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws
IOException {
InstanceService service =
- new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false,
+ new InstanceServiceImpl(_dataAccessor, _configAccessor,
_customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String
clusterId,
@@ -247,16 +293,16 @@ public class TestInstanceService {
verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(),
any());
}
- class MockInstanceServiceImpl extends InstanceServiceImpl {
+ private static class MockInstanceServiceImpl extends InstanceServiceImpl {
MockInstanceServiceImpl(ZKHelixDataAccessor dataAccessor, ConfigAccessor
configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead,
+ CustomRestClient customRestClient, boolean skipZKRead, boolean
continueOnFailures) {
+ super(dataAccessor, configAccessor, customRestClient, skipZKRead,
continueOnFailures,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
}
@Override
- protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
- String instanceName, List<InstanceService.HealthCheck> healthChecks) {
+ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName,
+ List<InstanceService.HealthCheck> healthChecks) {
return Collections.emptyMap();
}
}