This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 2de417783 Support aggregated endpoint for customized stoppable check
(#2919)
2de417783 is described below
commit 2de4177833e22050723651d7b5277ff78d9a3a60
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Wed Sep 25 09:31:41 2024 -0700
Support aggregated endpoint for customized stoppable check (#2919)
Support aggregated endpoint for customized stoppable check
---
.../java/org/apache/helix/model/RESTConfig.java | 27 +++-
.../apache/helix/rest/client/CustomRestClient.java | 16 ++
.../helix/rest/client/CustomRestClientImpl.java | 170 ++++++++++++++++-----
.../MaintenanceManagementService.java | 82 +++++++---
.../rest/server/json/instance/StoppableCheck.java | 3 +-
.../helix/rest/client/TestCustomRestClient.java | 30 ++++
.../TestMaintenanceManagementService.java | 124 +++++++++++++++
.../helix/rest/server/TestInstancesAccessor.java | 56 +++++++
8 files changed, 451 insertions(+), 57 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
index 90b1c85cf..de35f1d5b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
@@ -19,6 +19,8 @@ package org.apache.helix.model;
* under the License.
*/
+import java.util.Optional;
+
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -61,7 +63,7 @@ public class RESTConfig extends HelixProperty {
}
/**
- * Get the base restful endpoint of the instance
+ * Resolves the customized health URL by replacing the wildcard with the
instance's name
*
* @param instance The instance
* @return The base restful endpoint
@@ -83,4 +85,27 @@ public class RESTConfig extends HelixProperty {
return baseUrl.replace("*", instanceVip);
}
+
+ /**
+ * Retrieves the complete configured health URL if no wildcard is present.
+ * <p>
+ * For complete URL, only aggregated customized health check is supported.
For
+ * partition/instance health check, the URL should have the wildcard. The
example of the complete
+ * URL is "http://localhost:8080/healthcheck". The example of the URL with
wildcard is
+ * "http://*\/path".
+ * <p>
+ * This method is useful when aggregated health checks are required and
individual partition or
+ * instance checks need to be excluded.
+ * <p>
+ * Returns an empty Optional if the URL contains a wildcard.
+ *
+ * @return Optional containing the exact configured URL, or empty if a
wildcard is present.
+ */
+ public Optional<String> getCompleteConfiguredHealthUrl() {
+ String baseUrl = get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL);
+ if (baseUrl == null || baseUrl.contains("*")) {
+ return Optional.empty();
+ }
+ return Optional.of(baseUrl);
+ }
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
index bfbb4b655..c42a8a37a 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.rest.client;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Interacting with participant side to query for its health checks
@@ -50,4 +51,19 @@ public interface CustomRestClient {
*/
Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String>
partitions,
Map<String, String> customPayloads) throws IOException;
+
+ /**
+ * Get the stoppable check result for each of the given instances from the
aggregated health
+ * check endpoint.
+ * @param baseUrl the base url of the aggregated health status check endpoint
+ * @param instances a list of instances to check
+ * @param toBeStoppedInstances a set of instances which are assumed stopped
+ * @param clusterId the cluster id
+ * @param customPayloads generic payloads required from client side and
helix only works as proxy
+ * @return a map where key is instance name and value is a list of failed
checks
+ * @throws IOException
+ */
+ Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
List<String> instances,
+ Set<String> toBeStoppedInstances, String clusterId, Map<String, String>
customPayloads)
+ throws IOException;
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index b358b6c75..09783c8dd 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -20,14 +20,19 @@ package org.apache.helix.rest.client;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -46,16 +51,20 @@ class CustomRestClientImpl implements CustomRestClient {
// postfix used to append at the end of base url
private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus";
private static final String PARTITION_HEALTH_STATUS =
"/partitionHealthStatus";
+ private static final String AGGREGATED_HEALTH_STATUS =
"/aggregatedHealthStatus";
private static final String IS_HEALTHY_FIELD = "IS_HEALTHY";
private static final String PARTITIONS = "partitions";
private static final String ACCEPT_CONTENT_TYPE = "application/json";
+ private static final String HEALTH_INSTANCES = "stoppableInstances";
+ private static final String NON_STOPPABLE_INSTANCES =
"nonStoppableInstancesWithReasons";
+ private static final int MAX_REDIRECT = 3;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private HttpClient _httpClient;
+ private final HttpClient _httpClient;
- private interface JsonConverter {
- Map<String, Boolean> convert(JsonNode jsonNode);
+ private interface JsonConverter<T> {
+ Map<String, T> convert(JsonNode jsonNode);
}
public CustomRestClientImpl(HttpClient httpClient) {
@@ -68,11 +77,7 @@ class CustomRestClientImpl implements CustomRestClient {
// example url: http://<baseUrl>/instanceHealthStatus, assuming the base
url already directly
// queries at the instance
String url = baseUrl + INSTANCE_HEALTH_STATUS;
- JsonConverter jsonConverter = jsonNode -> {
- Map<String, Boolean> result = new HashMap<>();
- jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
kv.getValue().asBoolean()));
- return result;
- };
+ JsonConverter<Boolean> jsonConverter = this::extractBooleanMap;
return handleResponse(post(url,
Collections.unmodifiableMap(customPayloads)), jsonConverter);
}
@@ -93,15 +98,52 @@ class CustomRestClientImpl implements CustomRestClient {
if (partitions != null) {
payLoads.put(PARTITIONS, partitions);
}
- JsonConverter jsonConverter = jsonNode -> {
- Map<String, Boolean> result = new HashMap<>();
- jsonNode.fields().forEachRemaining(
- kv -> result.put(kv.getKey(),
kv.getValue().get(IS_HEALTHY_FIELD).asBoolean()));
- return result;
- };
+ JsonConverter<Boolean> jsonConverter = this::extractPartitionBooleanMap;
return handleResponse(post(url, payLoads), jsonConverter);
}
+ @Override
+ public Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
+ List<String> instances, Set<String> toBeStoppedInstances, String
clusterId,
+ Map<String, String> customPayloads) throws IOException {
+ /*
+ * example url: http://<baseUrl>/aggregatedHealthStatus -d {
+ * "instances" : ["n1", "n3", "n9"],
+ * "to_be_stopped_instances": "["n2", "n4"]",
+ * "cluster_id": "cluster1",
+ * "<key>": "<value>"
+ * ...
+ * }
+ *
+ * The response will be a json object with two fields:
"stoppableInstances" and
+ * "nonStoppableInstancesWithReasons". The value of "stoppableInstances"
is a list of instances
+ * that are stoppable. The value of "nonStoppableInstancesWithReasons" is
a map where the key is
+ * the instance name and the value is the reasons why the instance is not
stoppable.
+ *
+ * example response: {
+ * "stoppableInstances": ["n1", "n3"],
+ * "nonStoppableInstancesWithReasons": {
+ * "n2": "reason1,reason2",
+ * "n4": "reason3"
+ * }
+ */
+ String url = baseUrl + AGGREGATED_HEALTH_STATUS;
+ Map<String, Object> payLoads = new HashMap<>(customPayloads);
+ if (instances != null && !instances.isEmpty()) {
+ payLoads.put("instances", instances);
+ }
+ if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
+ payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
+ }
+ if (clusterId != null) {
+ payLoads.put("cluster_id", clusterId);
+ }
+
+ // JsonConverter to handle the response and map instance health status
+ JsonConverter<List<String>> converter = this::extractAggregatedStatus;
+ return handleResponse(post(url, payLoads), converter);
+ }
+
@VisibleForTesting
protected JsonNode getJsonObject(HttpResponse httpResponse) throws
IOException {
HttpEntity httpEntity = httpResponse.getEntity();
@@ -110,8 +152,8 @@ class CustomRestClientImpl implements CustomRestClient {
return OBJECT_MAPPER.readTree(str);
}
- private Map<String, Boolean> handleResponse(HttpResponse httpResponse,
- JsonConverter jsonConverter) throws IOException {
+ private <T> Map<String, T> handleResponse(HttpResponse httpResponse,
+ JsonConverter<T> jsonConverter) throws IOException {
int status = httpResponse.getStatusLine().getStatusCode();
if (status == HttpStatus.SC_OK) {
LOG.info("Expected HttpResponse statusCode: {}", HttpStatus.SC_OK);
@@ -119,33 +161,89 @@ class CustomRestClientImpl implements CustomRestClient {
} else {
// Ensure entity is fully consumed so stream is closed.
EntityUtils.consumeQuietly(httpResponse.getEntity());
- throw new ClientProtocolException("Unexpected response status: " +
status + ", reason: "
- + httpResponse.getStatusLine().getReasonPhrase());
+ throw new ClientProtocolException(
+ "Unexpected response status: " + status + ", reason: " +
httpResponse.getStatusLine()
+ .getReasonPhrase());
}
}
@VisibleForTesting
protected HttpResponse post(String url, Map<String, Object> payloads) throws
IOException {
HttpPost postRequest = new HttpPost(url);
- try {
- postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
- StringEntity entity = new
StringEntity(OBJECT_MAPPER.writeValueAsString(payloads),
- ContentType.APPLICATION_JSON);
- postRequest.setEntity(entity);
- LOG.info("Executing request: {}, headers: {}, entity: {}",
postRequest.getRequestLine(),
- postRequest.getAllHeaders(), postRequest.getEntity());
-
- HttpResponse response = _httpClient.execute(postRequest);
- int status = response.getStatusLine().getStatusCode();
- if (status != HttpStatus.SC_OK) {
- LOG.warn("Received non-200 status code: {}, payloads: {}", status,
payloads);
+ int retries = 0;
+ HttpResponse response = null;
+
+ while (retries < MAX_REDIRECT) {
+ try {
+ postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
+ StringEntity entity = new
StringEntity(OBJECT_MAPPER.writeValueAsString(payloads),
+ ContentType.APPLICATION_JSON);
+ postRequest.setEntity(entity);
+ LOG.info("Executing request: {}, headers: {}, entity: {}",
postRequest.getRequestLine(),
+ postRequest.getAllHeaders(), postRequest.getEntity());
+
+ // Execute the request
+ response = _httpClient.execute(postRequest);
+ int status = response.getStatusLine().getStatusCode();
+
+ if (status == HttpStatus.SC_OK) {
+ return response; // Return the successful response
+ } else if (status == HttpStatus.SC_MOVED_TEMPORARILY) {
+ // If receiving 302 Found (redirect), handle the redirection
+ Header locationHeader = response.getFirstHeader("Location");
+ if (locationHeader != null) {
+ String redirectUrl = locationHeader.getValue();
+ LOG.info("Redirecting to: {}", redirectUrl);
+ // Update the post request with the new URL
+ postRequest = new HttpPost(redirectUrl);
+ retries++;
+ LOG.info("Retrying redirect (attempt {} of {})", retries,
MAX_REDIRECT);
+ } else {
+ LOG.warn("Received 302 but no Location header is present, stopping
retries.");
+ break; // Break out if there is no valid redirect location
+ }
+ } else {
+ LOG.warn("Received non-200 and non-302 status code: {}, payloads:
{}", status, payloads);
+ return response; // Return response without retry
+ }
+ } catch (IOException e) {
+ // Release connection to be reused and avoid connection leakage
+ postRequest.releaseConnection();
+ throw e;
}
-
- return response;
- } catch (IOException e) {
- // Release connection to be reused and avoid connection leakage.
- postRequest.releaseConnection();
- throw e;
}
+
+ return response;
+ }
+
+ private Map<String, Boolean> extractBooleanMap(JsonNode jsonNode) {
+ Map<String, Boolean> result = new HashMap<>();
+ jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
kv.getValue().asBoolean()));
+ return result;
+ }
+
+ private Map<String, Boolean> extractPartitionBooleanMap(JsonNode jsonNode) {
+ Map<String, Boolean> result = new HashMap<>();
+ jsonNode.fields().forEachRemaining(
+ kv -> result.put(kv.getKey(),
kv.getValue().get(IS_HEALTHY_FIELD).asBoolean()));
+ return result;
+ }
+
+ private Map<String, List<String>> extractAggregatedStatus(JsonNode jsonNode)
{
+ Map<String, List<String>> result = new HashMap<>();
+ jsonNode.fields().forEachRemaining(response -> {
+ String key = response.getKey();
+ if (HEALTH_INSTANCES.equals(key)) {
+ response.getValue().forEach(instance -> {
+ result.put(instance.textValue(), Collections.emptyList());
+ });
+ }
+ if (NON_STOPPABLE_INSTANCES.equals(key)) {
+ response.getValue().fields().forEachRemaining(instance ->
result.put(instance.getKey(),
+ Stream.of(instance.getValue().toString().split(","))
+ .collect(Collectors.toCollection(ArrayList<String>::new))));
+ }
+ });
+ return result;
}
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 063e7107f..925b2d216 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -84,7 +84,8 @@ public class MaintenanceManagementService {
public static final Set<StoppableCheck.Category>
SKIPPABLE_HEALTH_CHECK_CATEGORIES =
ImmutableSet.of(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK,
- StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+ StoppableCheck.Category.CUSTOM_PARTITION_CHECK,
+ StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK);
private final ConfigAccessor _configAccessor;
private final CustomRestClient _customRestClient;
@@ -381,7 +382,7 @@ public class MaintenanceManagementService {
toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId,
instancesForCustomInstanceLevelChecks,
- finalStoppableChecks, getMapFromJsonPayload(jsonContent));
+ toBeStoppedInstances, finalStoppableChecks,
getMapFromJsonPayload(jsonContent));
return finalStoppableChecks;
}
@@ -486,24 +487,46 @@ public class MaintenanceManagementService {
}
private List<String> batchCustomInstanceStoppableCheck(String clusterId,
List<String> instances,
- Map<String, StoppableCheck> finalStoppableChecks, Map<String, String>
customPayLoads) {
+ Set<String> toBeStoppedInstances, Map<String, StoppableCheck>
finalStoppableChecks,
+ Map<String, String> customPayLoads) {
if (instances.isEmpty()) {
// if all instances failed at previous checks, then all following checks
are not required.
return instances;
}
RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
- if (restConfig == null && (
-
!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)
- || !_skipHealthCheckCategories.contains(
- StoppableCheck.Category.CUSTOM_PARTITION_CHECK))) {
- String errorMessage = String.format(
- "The cluster %s hasn't enabled client side health checks yet, "
- + "thus the stoppable check result is inaccurate", clusterId);
- LOG.error(errorMessage);
- throw new HelixException(errorMessage);
- }
-
- List<String> instancesForCustomPartitionLevelChecks;
+ if (restConfig == null) {
+ // If the user didn't set up the rest config, we can't perform the
custom check.
+ // Therefore, skip the custom check.
+ LOG.info(String.format("The cluster %s hasn't enabled client side health
checks yet, "
+ + "thus the stoppable check result is inaccurate", clusterId));
+ return instances;
+ }
+
+ // If the config has exactUrl and the CLUSTER level customer check is not
skipped, we will
+ // perform the custom check at cluster level.
+ if (restConfig.getCompleteConfiguredHealthUrl().isPresent()) {
+ if
(_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK))
{
+ return instances;
+ }
+
+ Map<String, StoppableCheck> clusterLevelCustomCheckResult =
+ performAggregatedCustomCheck(clusterId, instances,
+ restConfig.getCompleteConfiguredHealthUrl().get(),
customPayLoads,
+ toBeStoppedInstances);
+ List<String> instancesForNextCheck = new ArrayList<>();
+ clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> {
+ addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
+ if (stoppableCheck.isStoppable() ||
isNonBlockingCheck(stoppableCheck)) {
+ instancesForNextCheck.add(instance);
+ }
+ });
+
+ return instancesForNextCheck;
+ }
+
+ // Reaching here means the rest config requires instances/partition level
checks. We will
+ // perform the custom check at instance/partition level if they are not
skipped.
+ List<String> instancesForCustomPartitionLevelChecks = instances;
if
(!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK))
{
Map<String, Future<StoppableCheck>> customInstanceLevelChecks =
instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
@@ -511,8 +534,6 @@ public class MaintenanceManagementService {
customPayLoads))));
instancesForCustomPartitionLevelChecks =
filterInstancesForNextCheck(customInstanceLevelChecks,
finalStoppableChecks);
- } else {
- instancesForCustomPartitionLevelChecks = instances;
}
if (!instancesForCustomPartitionLevelChecks.isEmpty() &&
!_skipHealthCheckCategories.contains(
@@ -556,8 +577,8 @@ public class MaintenanceManagementService {
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
- batchCustomInstanceStoppableCheck(clusterId, instancesForNext,
finalStoppableChecks,
- healthCheckConfig);
+ batchCustomInstanceStoppableCheck(clusterId, instancesForNext,
Collections.emptySet(),
+ finalStoppableChecks, healthCheckConfig);
} else {
throw new UnsupportedOperationException(healthCheck + " is not
supported yet!");
}
@@ -702,6 +723,29 @@ public class MaintenanceManagementService {
return instanceStoppableChecks;
}
+ private Map<String, StoppableCheck> performAggregatedCustomCheck(String
clusterId,
+ List<String> instances, String url, Map<String, String> customPayLoads,
+ Set<String> toBeStoppedInstances) {
+ Map<String, StoppableCheck> aggregatedStoppableChecks = new HashMap<>();
+ try {
+ Map<String, List<String>> customCheckResult =
+ _customRestClient.getAggregatedStoppableCheck(url, instances,
toBeStoppedInstances,
+ clusterId, customPayLoads);
+ for (Map.Entry<String, List<String>> entry :
customCheckResult.entrySet()) {
+ // If the list is empty, it means the instance is stoppable.
+ aggregatedStoppableChecks.put(entry.getKey(),
+ new StoppableCheck(entry.getValue().isEmpty(), entry.getValue(),
+ StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK));
+ }
+ } catch (IOException ex) {
+ LOG.error("Custom client side aggregated health check for {} failed.",
clusterId, ex);
+ return instances.stream().collect(Collectors.toMap(Function.identity(),
+ instance -> new StoppableCheck(false,
Collections.singletonList(instance),
+ StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)));
+ }
+ return aggregatedStoppableChecks;
+ }
+
public static Map<String, String> getMapFromJsonPayload(String jsonContent)
throws IOException {
Map<String, String> result = new HashMap<>();
if (jsonContent == null) {
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index 2985bd86f..0c1becbc1 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -33,7 +33,8 @@ public class StoppableCheck {
public enum Category {
HELIX_OWN_CHECK("HELIX:"),
CUSTOM_INSTANCE_CHECK("CUSTOM_INSTANCE_HEALTH_FAILURE:"),
- CUSTOM_PARTITION_CHECK("CUSTOM_PARTITION_HEALTH_FAILURE:");
+ CUSTOM_PARTITION_CHECK("CUSTOM_PARTITION_HEALTH_FAILURE:"),
+ CUSTOM_AGGREGATED_CHECK("CUSTOM_AGGREGATED_HEALTH_FAILURE:");
String prefix;
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
index 06015dd50..5a5fec04b 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -20,12 +20,15 @@ package org.apache.helix.rest.client;
*/
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
@@ -204,4 +207,31 @@ public class TestCustomRestClient {
return new ObjectMapper().readTree(_jsonResponse);
}
}
+
+ @Test
+ public void testGetAggregatedStoppableCheck() throws IOException {
+ MockCustomRestClient customRestClient = new
MockCustomRestClient(_httpClient);
+ String jsonResponse = "{\n \"stoppableInstances\" : [\"n1\", \"n2\",
\"n3\"],\n \"nonStoppableInstancesWithReasons\": "
+ + "{\n \"n4\": \"ERROR_PARTITION STILL_BOOTSTRAPPING\",\n
\"n10\": \"NOT_READY\"\n }\n}";
+ String clusterId = "cluster1";
+ String[] instances = {"n1", "n2", "n3", "n4", "n10"};
+ String[] healthyInstances = {"n1", "n2", "n3"};
+ String[] nonStoppableInstances = {"n4", "n10"};
+
+ HttpResponse httpResponse = mock(HttpResponse.class);
+ StatusLine statusLine = mock(StatusLine.class);
+
+ when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ customRestClient.setJsonResponse(jsonResponse);
+ when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+ Map<String, List<String>> clusterHealth =
customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
+ ImmutableList.of("n1", "n2", "n3", "n4", "n10"),
+ ImmutableSet.of("n7", "n8"), clusterId, Collections.emptyMap());
+
+
Assert.assertTrue(Arrays.stream(instances).allMatch(clusterHealth::containsKey));
+ Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance ->
clusterHealth.get(instance).isEmpty()));
+ Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance
-> clusterHealth.get(instance).isEmpty()));
+ }
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 6c207bd10..5801fa231 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -20,6 +20,7 @@ package org.apache.helix.rest.clusterMaintenanceService;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -55,6 +56,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
@@ -414,4 +416,126 @@ public class TestMaintenanceManagementService {
verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(),
any());
}
+ @Test
+ public void testGetAggregatedStoppableCheckWithFailedInstances() throws
IOException {
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+ Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+ List<String> instances = List.of(TEST_INSTANCE);
+ RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+ // Set the customized health URL to a fake address which contains exact URI
+ restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL,
"http://fakeAddress:123/path");
+ when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+ when(
+ _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(),
anySet(), anyString(),
+ anyMap())).thenReturn(
+ ImmutableMap.of(TEST_INSTANCE,
Collections.singletonList("FAILED_FAKE_CHECK")));
+
+ Map<String, StoppableCheck> result =
+ service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "",
toBeStoppedInstances);
+
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertTrue(result.containsKey(TEST_INSTANCE));
+ Assert.assertFalse(result.get(TEST_INSTANCE).isStoppable());
+
+ // Since the cluster level check is not skipped,
getAggregatedStoppableCheck should be called
+ verify(_customRestClient, times(1)).getAggregatedStoppableCheck(any(),
any(), any(), any(),
+ any());
+ // Since the rest URI uses the exact URI, no instance/partition level
check should be performed
+ verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(),
any());
+ verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(),
any(), any());
+ }
+
+ @Test
+ public void testGetAggregatedStoppableCheckWithStoppableInstances() throws
IOException {
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+ Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+ List<String> instances = new ArrayList<>(Arrays.asList("n1", "n3", "n5"));
+ RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+ // Set the customized health URL to a fake address which contains exact URI
+ restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL,
"http://fakeAddress:123/path");
+
+ Map<String, List<String>> stoppableCheckResult = new HashMap<>();
+ stoppableCheckResult.put("n1", Collections.emptyList()); // n1 is stoppable
+ stoppableCheckResult.put("n3",
+ List.of("FAILED_FAKE_CHECK", "FAILED_FAKE_CHECK2")); // n3 is not
stoppable
+ stoppableCheckResult.put("n5", Collections.emptyList()); // n5 is stoppable
+ when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+ when(
+ _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(),
anySet(), anyString(),
+ anyMap())).thenReturn(stoppableCheckResult);
+
+ Map<String, StoppableCheck> result =
+ service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "",
toBeStoppedInstances);
+
+ Assert.assertEquals(result.size(), 3);
+ instances.forEach(instance -> {
+ Assert.assertTrue(result.containsKey(instance));
+ if (instance.equals("n1") || instance.equals("n5")) {
+ Assert.assertTrue(result.get(instance).isStoppable());
+ } else {
+ Assert.assertFalse(result.get(instance).isStoppable());
+ Assert.assertEquals(result.get(instance).getFailedChecks().size(), 2);
+ }
+ });
+
+ // Since the cluster level check is not skipped,
getAggregatedStoppableCheck should be called
+ verify(_customRestClient, times(1)).getAggregatedStoppableCheck(any(),
any(), any(), any(),
+ any());
+ // Since the rest URI uses the exact URI, no instance/partition level
check should be performed
+ verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(),
any());
+ verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(),
any(), any());
+ }
+
+ @Test
+ public void testNewRestConfigBackwardCompatibilityWithRestURIWithWildcard()
throws IOException {
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
+ Set.of(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK),
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+ Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+ List<String> instances = List.of(TEST_INSTANCE);
+ RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+ restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL,
"http://fakeAddress:123/path");
+ when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+ when(
+ _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(),
anySet(), anyString(),
+ anyMap())).thenReturn(
+ ImmutableMap.of(TEST_INSTANCE,
Collections.singletonList("FAILED_FAKE_CHECK")));
+
+ service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "",
toBeStoppedInstances);
+ // Since the cluster level check is skipped, getAggregatedStoppableCheck
should not be called
+ verify(_customRestClient, times(0)).getAggregatedStoppableCheck(any(),
any(), any(), any(),
+ any());
+ verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(),
any());
+ verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(),
any(), any());
+ }
+
+ @Test
+ public void testRestConfigWorkWithInstancesAndPartitionChecks() throws
IOException {
+ MockMaintenanceManagementService service =
+ new MockMaintenanceManagementService(_dataAccessorWrapper,
_configAccessor,
+ _customRestClient, false, false,
Set.of(StoppableCheck.Category.CUSTOM_PARTITION_CHECK),
+ HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+ Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+ List<String> instances = List.of(TEST_INSTANCE);
+ RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+ restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL,
"http://*:123/path");
+ when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+ when(_customRestClient.getInstanceStoppableCheck(anyString(),
anyMap())).thenReturn(
+ ImmutableMap.of(TEST_INSTANCE, true));
+
+ service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "",
toBeStoppedInstances);
+ // Since the cluster level check is skipped, getAggregatedStoppableCheck
should not be called
+ verify(_customRestClient, times(0)).getAggregatedStoppableCheck(any(),
any(), any(), any(),
+ any());
+ verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(),
any());
+ verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(),
any(), any());
+ }
}
\ No newline at end of file
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 61ceef3a0..e1bbe7869 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.RESTConfig;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
@@ -577,6 +578,61 @@ public class TestInstancesAccessor extends
AbstractTestClass {
System.out.println("End test :" + TestHelper.getTestMethodName());
}
+ @Test(dependsOnMethods = "testMultipleReplicasInSameMZ")
+ public void testSkipClusterLevelHealthCheck() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String content = String.format(
+ "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\",
\"%s\",\"%s\", \"%s\", \"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance1",
"instance3",
+ "instance6", "instance9", "instance10", "instance11", "instance12",
"instance13",
+ "instance14", "invalidInstance");
+
+ // Change instance config of instance1 & instance0 to be evacuating
+ String instance0 = "instance0";
+ InstanceConfig instanceConfig =
+ _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance0);
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0,
instanceConfig);
+ String instance1 = "instance1";
+ InstanceConfig instanceConfig1 =
+ _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1);
+
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1,
instanceConfig1);
+ RESTConfig restConfig = new RESTConfig(STOPPABLE_CLUSTER2);
+ restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL,
"http://localhost:1234");
+ _configAccessor.setRESTConfig(STOPPABLE_CLUSTER2, restConfig);
+ // It takes time to reflect the changes.
+ BestPossibleExternalViewVerifier verifier =
+ new
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ Response response = new JerseyUriRequestBuilder(
+
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_AGGREGATED_CHECK").format(
+ STOPPABLE_CLUSTER2).post(this, Entity.entity(content,
MediaType.APPLICATION_JSON_TYPE));
+ JsonNode jsonNode =
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+ Set<String> stoppableSet = getStringSet(jsonNode,
+
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ Assert.assertTrue(stoppableSet.contains("instance12") &&
stoppableSet.contains("instance11")
+ && stoppableSet.contains("instance10"));
+
+ JsonNode nonStoppableInstances = jsonNode.get(
+
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"),
+ ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+ Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+ ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0,
instanceConfig);
+
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1,
instanceConfig1);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));