This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de417783 Support aggregated endpoint for customized stoppable check 
(#2919)
2de417783 is described below

commit 2de4177833e22050723651d7b5277ff78d9a3a60
Author: Xiaxuan Gao <[email protected]>
AuthorDate: Wed Sep 25 09:31:41 2024 -0700

    Support aggregated endpoint for customized stoppable check (#2919)
    
    Support aggregated endpoint for customized stoppable check
---
 .../java/org/apache/helix/model/RESTConfig.java    |  27 +++-
 .../apache/helix/rest/client/CustomRestClient.java |  16 ++
 .../helix/rest/client/CustomRestClientImpl.java    | 170 ++++++++++++++++-----
 .../MaintenanceManagementService.java              |  82 +++++++---
 .../rest/server/json/instance/StoppableCheck.java  |   3 +-
 .../helix/rest/client/TestCustomRestClient.java    |  30 ++++
 .../TestMaintenanceManagementService.java          | 124 +++++++++++++++
 .../helix/rest/server/TestInstancesAccessor.java   |  56 +++++++
 8 files changed, 451 insertions(+), 57 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
index 90b1c85cf..de35f1d5b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
@@ -19,6 +19,8 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import java.util.Optional;
+
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -61,7 +63,7 @@ public class RESTConfig extends HelixProperty {
   }
 
   /**
-   * Get the base restful endpoint of the instance
+   * Resolves the customized health URL by replacing the wildcard with the 
instance's name
    *
    * @param instance The instance
    * @return The base restful endpoint
@@ -83,4 +85,27 @@ public class RESTConfig extends HelixProperty {
 
     return baseUrl.replace("*", instanceVip);
   }
+
+  /**
+   * Retrieves the complete configured health URL if no wildcard is present.
+   * <p>
+   * For complete URL, only aggregated customized health check is supported. 
For
+   * partition/instance health check, the URL should have the wildcard. The 
example of the complete
+   * URL is "http://localhost:8080/healthcheck";. The example of the URL with 
wildcard is
+   * "http://*\/path";.
+   * <p>
+   * This method is useful when aggregated health checks are required and 
individual partition or
+   * instance checks need to be excluded.
+   * <p>
+   * Returns an empty Optional if the URL contains a wildcard.
+   *
+   * @return Optional containing the exact configured URL, or empty if a 
wildcard is present.
+   */
+  public Optional<String> getCompleteConfiguredHealthUrl() {
+    String baseUrl = get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL);
+    if (baseUrl == null || baseUrl.contains("*")) {
+      return Optional.empty();
+    }
+    return Optional.of(baseUrl);
+  }
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
index bfbb4b655..c42a8a37a 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.rest.client;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Interacting with participant side to query for its health checks
@@ -50,4 +51,19 @@ public interface CustomRestClient {
    */
   Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String> 
partitions,
       Map<String, String> customPayloads) throws IOException;
+
+  /**
+   * Get the stoppable check result for each of the given instances from the 
aggregated health
+   * check endpoint.
+   * @param baseUrl the base url of the aggregated health status check endpoint
+   * @param instances a list of instances to check
+   * @param toBeStoppedInstances a set of instances which are assumed stopped
+   * @param clusterId the cluster id
+   * @param customPayloads generic payloads required from client side and 
helix only works as proxy
+   * @return a map where key is instance name and value is a list of failed 
checks
+   * @throws IOException
+   */
+  Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl, 
List<String> instances,
+      Set<String> toBeStoppedInstances, String clusterId, Map<String, String> 
customPayloads)
+      throws IOException;
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index b358b6c75..09783c8dd 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -20,14 +20,19 @@ package org.apache.helix.rest.client;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -46,16 +51,20 @@ class CustomRestClientImpl implements CustomRestClient {
   // postfix used to append at the end of base url
   private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus";
   private static final String PARTITION_HEALTH_STATUS = 
"/partitionHealthStatus";
+  private static final String AGGREGATED_HEALTH_STATUS = 
"/aggregatedHealthStatus";
 
   private static final String IS_HEALTHY_FIELD = "IS_HEALTHY";
   private static final String PARTITIONS = "partitions";
   private static final String ACCEPT_CONTENT_TYPE = "application/json";
+  private static final String HEALTH_INSTANCES = "stoppableInstances";
+  private static final String NON_STOPPABLE_INSTANCES = 
"nonStoppableInstancesWithReasons";
+  private static final int MAX_REDIRECT = 3;
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-  private HttpClient _httpClient;
+  private final HttpClient _httpClient;
 
-  private interface JsonConverter {
-    Map<String, Boolean> convert(JsonNode jsonNode);
+  private interface JsonConverter<T> {
+    Map<String, T> convert(JsonNode jsonNode);
   }
 
   public CustomRestClientImpl(HttpClient httpClient) {
@@ -68,11 +77,7 @@ class CustomRestClientImpl implements CustomRestClient {
     // example url: http://<baseUrl>/instanceHealthStatus, assuming the base 
url already directly
     // queries at the instance
     String url = baseUrl + INSTANCE_HEALTH_STATUS;
-    JsonConverter jsonConverter = jsonNode -> {
-      Map<String, Boolean> result = new HashMap<>();
-      jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), 
kv.getValue().asBoolean()));
-      return result;
-    };
+    JsonConverter<Boolean> jsonConverter = this::extractBooleanMap;
     return handleResponse(post(url, 
Collections.unmodifiableMap(customPayloads)), jsonConverter);
   }
 
@@ -93,15 +98,52 @@ class CustomRestClientImpl implements CustomRestClient {
     if (partitions != null) {
       payLoads.put(PARTITIONS, partitions);
     }
-    JsonConverter jsonConverter = jsonNode -> {
-      Map<String, Boolean> result = new HashMap<>();
-      jsonNode.fields().forEachRemaining(
-          kv -> result.put(kv.getKey(), 
kv.getValue().get(IS_HEALTHY_FIELD).asBoolean()));
-      return result;
-    };
+    JsonConverter<Boolean> jsonConverter = this::extractPartitionBooleanMap;
     return handleResponse(post(url, payLoads), jsonConverter);
   }
 
+  @Override
+  public Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
+      List<String> instances, Set<String> toBeStoppedInstances, String 
clusterId,
+      Map<String, String> customPayloads) throws IOException {
+    /*
+     * example url: http://<baseUrl>/aggregatedHealthStatus -d {
+     * "instances" : ["n1", "n3", "n9"],
+     * "to_be_stopped_instances": "["n2", "n4"]",
+     * "cluster_id": "cluster1",
+     * "<key>": "<value>"
+     * ...
+     * }
+     *
+     * The response will be a json object with two fields: 
"stoppableInstances" and
+     * "nonStoppableInstancesWithReasons". The value of "stoppableInstances" 
is a list of instances
+     * that are stoppable. The value of "nonStoppableInstancesWithReasons" is 
a map where the key is
+     * the instance name and the value is the reasons why the instance is not 
stoppable.
+     *
+     * example response: {
+     * "stoppableInstances": ["n1", "n3"],
+     * "nonStoppableInstancesWithReasons": {
+     * "n2": "reason1,reason2",
+     * "n4": "reason3"
+     * }
+     */
+    String url = baseUrl + AGGREGATED_HEALTH_STATUS;
+    Map<String, Object> payLoads = new HashMap<>(customPayloads);
+    if (instances != null && !instances.isEmpty()) {
+      payLoads.put("instances", instances);
+    }
+    if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
+      payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
+    }
+    if (clusterId != null) {
+      payLoads.put("cluster_id", clusterId);
+    }
+
+    // JsonConverter to handle the response and map instance health status
+    JsonConverter<List<String>> converter = this::extractAggregatedStatus;
+    return handleResponse(post(url, payLoads), converter);
+  }
+
   @VisibleForTesting
   protected JsonNode getJsonObject(HttpResponse httpResponse) throws 
IOException {
     HttpEntity httpEntity = httpResponse.getEntity();
@@ -110,8 +152,8 @@ class CustomRestClientImpl implements CustomRestClient {
     return OBJECT_MAPPER.readTree(str);
   }
 
-  private Map<String, Boolean> handleResponse(HttpResponse httpResponse,
-      JsonConverter jsonConverter) throws IOException {
+  private <T> Map<String, T> handleResponse(HttpResponse httpResponse,
+      JsonConverter<T> jsonConverter) throws IOException {
     int status = httpResponse.getStatusLine().getStatusCode();
     if (status == HttpStatus.SC_OK) {
       LOG.info("Expected HttpResponse statusCode: {}", HttpStatus.SC_OK);
@@ -119,33 +161,89 @@ class CustomRestClientImpl implements CustomRestClient {
     } else {
       // Ensure entity is fully consumed so stream is closed.
       EntityUtils.consumeQuietly(httpResponse.getEntity());
-      throw new ClientProtocolException("Unexpected response status: " + 
status + ", reason: "
-          + httpResponse.getStatusLine().getReasonPhrase());
+      throw new ClientProtocolException(
+          "Unexpected response status: " + status + ", reason: " + 
httpResponse.getStatusLine()
+              .getReasonPhrase());
     }
   }
 
   @VisibleForTesting
   protected HttpResponse post(String url, Map<String, Object> payloads) throws 
IOException {
     HttpPost postRequest = new HttpPost(url);
-    try {
-      postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
-      StringEntity entity = new 
StringEntity(OBJECT_MAPPER.writeValueAsString(payloads),
-          ContentType.APPLICATION_JSON);
-      postRequest.setEntity(entity);
-      LOG.info("Executing request: {}, headers: {}, entity: {}", 
postRequest.getRequestLine(),
-          postRequest.getAllHeaders(), postRequest.getEntity());
-
-      HttpResponse response = _httpClient.execute(postRequest);
-      int status = response.getStatusLine().getStatusCode();
-      if (status != HttpStatus.SC_OK) {
-        LOG.warn("Received non-200 status code: {}, payloads: {}", status, 
payloads);
+    int retries = 0;
+    HttpResponse response = null;
+
+    while (retries < MAX_REDIRECT) {
+      try {
+        postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
+        StringEntity entity = new 
StringEntity(OBJECT_MAPPER.writeValueAsString(payloads),
+            ContentType.APPLICATION_JSON);
+        postRequest.setEntity(entity);
+        LOG.info("Executing request: {}, headers: {}, entity: {}", 
postRequest.getRequestLine(),
+            postRequest.getAllHeaders(), postRequest.getEntity());
+
+        // Execute the request
+        response = _httpClient.execute(postRequest);
+        int status = response.getStatusLine().getStatusCode();
+
+        if (status == HttpStatus.SC_OK) {
+          return response;  // Return the successful response
+        } else if (status == HttpStatus.SC_MOVED_TEMPORARILY) {
+          // If receiving 302 Found (redirect), handle the redirection
+          Header locationHeader = response.getFirstHeader("Location");
+          if (locationHeader != null) {
+            String redirectUrl = locationHeader.getValue();
+            LOG.info("Redirecting to: {}", redirectUrl);
+            // Update the post request with the new URL
+            postRequest = new HttpPost(redirectUrl);
+            retries++;
+            LOG.info("Retrying redirect (attempt {} of {})", retries, 
MAX_REDIRECT);
+          } else {
+            LOG.warn("Received 302 but no Location header is present, stopping 
retries.");
+            break;  // Break out if there is no valid redirect location
+          }
+        } else {
+          LOG.warn("Received non-200 and non-302 status code: {}, payloads: 
{}", status, payloads);
+          return response;  // Return response without retry
+        }
+      } catch (IOException e) {
+        // Release connection to be reused and avoid connection leakage
+        postRequest.releaseConnection();
+        throw e;
       }
-
-      return response;
-    } catch (IOException e) {
-      // Release connection to be reused and avoid connection leakage.
-      postRequest.releaseConnection();
-      throw e;
     }
+
+    return response;
+  }
+
+  private Map<String, Boolean> extractBooleanMap(JsonNode jsonNode) {
+    Map<String, Boolean> result = new HashMap<>();
+    jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), 
kv.getValue().asBoolean()));
+    return result;
+  }
+
+  private Map<String, Boolean> extractPartitionBooleanMap(JsonNode jsonNode) {
+    Map<String, Boolean> result = new HashMap<>();
+    jsonNode.fields().forEachRemaining(
+        kv -> result.put(kv.getKey(), 
kv.getValue().get(IS_HEALTHY_FIELD).asBoolean()));
+    return result;
+  }
+
+  private Map<String, List<String>> extractAggregatedStatus(JsonNode jsonNode) 
{
+    Map<String, List<String>> result = new HashMap<>();
+    jsonNode.fields().forEachRemaining(response -> {
+      String key = response.getKey();
+      if (HEALTH_INSTANCES.equals(key)) {
+        response.getValue().forEach(instance -> {
+          result.put(instance.textValue(), Collections.emptyList());
+        });
+      }
+      if (NON_STOPPABLE_INSTANCES.equals(key)) {
+        response.getValue().fields().forEachRemaining(instance -> 
result.put(instance.getKey(),
+            Stream.of(instance.getValue().toString().split(","))
+                .collect(Collectors.toCollection(ArrayList<String>::new))));
+      }
+    });
+    return result;
   }
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 063e7107f..925b2d216 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -84,7 +84,8 @@ public class MaintenanceManagementService {
 
   public static final Set<StoppableCheck.Category> 
SKIPPABLE_HEALTH_CHECK_CATEGORIES =
       ImmutableSet.of(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK,
-          StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+          StoppableCheck.Category.CUSTOM_PARTITION_CHECK,
+          StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK);
 
   private final ConfigAccessor _configAccessor;
   private final CustomRestClient _customRestClient;
@@ -381,7 +382,7 @@ public class MaintenanceManagementService {
             toBeStoppedInstances);
     // custom check, includes partition check.
     batchCustomInstanceStoppableCheck(clusterId, 
instancesForCustomInstanceLevelChecks,
-        finalStoppableChecks, getMapFromJsonPayload(jsonContent));
+        toBeStoppedInstances, finalStoppableChecks, 
getMapFromJsonPayload(jsonContent));
     return finalStoppableChecks;
   }
 
@@ -486,24 +487,46 @@ public class MaintenanceManagementService {
   }
 
   private List<String> batchCustomInstanceStoppableCheck(String clusterId, 
List<String> instances,
-      Map<String, StoppableCheck> finalStoppableChecks, Map<String, String> 
customPayLoads) {
+      Set<String> toBeStoppedInstances, Map<String, StoppableCheck> 
finalStoppableChecks,
+      Map<String, String> customPayLoads) {
     if (instances.isEmpty()) {
       // if all instances failed at previous checks, then all following checks 
are not required.
       return instances;
     }
     RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
-    if (restConfig == null && (
-        
!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)
-            || !_skipHealthCheckCategories.contains(
-            StoppableCheck.Category.CUSTOM_PARTITION_CHECK))) {
-      String errorMessage = String.format(
-          "The cluster %s hasn't enabled client side health checks yet, "
-              + "thus the stoppable check result is inaccurate", clusterId);
-      LOG.error(errorMessage);
-      throw new HelixException(errorMessage);
-    }
-
-    List<String> instancesForCustomPartitionLevelChecks;
+    if (restConfig == null) {
+      // If the user didn't set up the rest config, we can't perform the 
custom check.
+      // Therefore, skip the custom check.
+      LOG.info(String.format("The cluster %s hasn't enabled client side health 
checks yet, "
+          + "thus the stoppable check result is inaccurate", clusterId));
+      return instances;
+    }
+
+    // If the config has exactUrl and the CLUSTER level customer check is not 
skipped, we will
+    // perform the custom check at cluster level.
+    if (restConfig.getCompleteConfiguredHealthUrl().isPresent()) {
+      if 
(_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK))
 {
+        return instances;
+      }
+
+      Map<String, StoppableCheck> clusterLevelCustomCheckResult =
+          performAggregatedCustomCheck(clusterId, instances,
+              restConfig.getCompleteConfiguredHealthUrl().get(), 
customPayLoads,
+              toBeStoppedInstances);
+      List<String> instancesForNextCheck = new ArrayList<>();
+      clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> {
+        addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
+        if (stoppableCheck.isStoppable() || 
isNonBlockingCheck(stoppableCheck)) {
+          instancesForNextCheck.add(instance);
+        }
+      });
+
+      return instancesForNextCheck;
+    }
+
+    // Reaching here means the rest config requires instances/partition level 
checks. We will
+    // perform the custom check at instance/partition level if they are not 
skipped.
+    List<String> instancesForCustomPartitionLevelChecks = instances;
     if 
(!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK))
 {
       Map<String, Future<StoppableCheck>> customInstanceLevelChecks = 
instances.stream().collect(
           Collectors.toMap(Function.identity(), instance -> POOL.submit(
@@ -511,8 +534,6 @@ public class MaintenanceManagementService {
                   customPayLoads))));
       instancesForCustomPartitionLevelChecks =
           filterInstancesForNextCheck(customInstanceLevelChecks, 
finalStoppableChecks);
-    } else {
-      instancesForCustomPartitionLevelChecks = instances;
     }
 
     if (!instancesForCustomPartitionLevelChecks.isEmpty() && 
!_skipHealthCheckCategories.contains(
@@ -556,8 +577,8 @@ public class MaintenanceManagementService {
       } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
         // custom check, includes custom Instance check and partition check.
         instancesForNext =
-            batchCustomInstanceStoppableCheck(clusterId, instancesForNext, 
finalStoppableChecks,
-                healthCheckConfig);
+            batchCustomInstanceStoppableCheck(clusterId, instancesForNext, 
Collections.emptySet(),
+                finalStoppableChecks, healthCheckConfig);
       } else {
         throw new UnsupportedOperationException(healthCheck + " is not 
supported yet!");
       }
@@ -702,6 +723,29 @@ public class MaintenanceManagementService {
     return instanceStoppableChecks;
   }
 
+  private Map<String, StoppableCheck> performAggregatedCustomCheck(String 
clusterId,
+      List<String> instances, String url, Map<String, String> customPayLoads,
+      Set<String> toBeStoppedInstances) {
+    Map<String, StoppableCheck> aggregatedStoppableChecks = new HashMap<>();
+    try {
+      Map<String, List<String>> customCheckResult =
+          _customRestClient.getAggregatedStoppableCheck(url, instances, 
toBeStoppedInstances,
+              clusterId, customPayLoads);
+      for (Map.Entry<String, List<String>> entry : 
customCheckResult.entrySet()) {
+        // If the list is empty, it means the instance is stoppable.
+        aggregatedStoppableChecks.put(entry.getKey(),
+            new StoppableCheck(entry.getValue().isEmpty(), entry.getValue(),
+                StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK));
+      }
+    } catch (IOException ex) {
+      LOG.error("Custom client side aggregated health check for {} failed.", 
clusterId, ex);
+      return instances.stream().collect(Collectors.toMap(Function.identity(),
+          instance -> new StoppableCheck(false, 
Collections.singletonList(instance),
+              StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)));
+    }
+    return aggregatedStoppableChecks;
+  }
+
   public static Map<String, String> getMapFromJsonPayload(String jsonContent) 
throws IOException {
     Map<String, String> result = new HashMap<>();
     if (jsonContent == null) {
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index 2985bd86f..0c1becbc1 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -33,7 +33,8 @@ public class StoppableCheck {
   public enum Category {
     HELIX_OWN_CHECK("HELIX:"),
     CUSTOM_INSTANCE_CHECK("CUSTOM_INSTANCE_HEALTH_FAILURE:"),
-    CUSTOM_PARTITION_CHECK("CUSTOM_PARTITION_HEALTH_FAILURE:");
+    CUSTOM_PARTITION_CHECK("CUSTOM_PARTITION_HEALTH_FAILURE:"),
+    CUSTOM_AGGREGATED_CHECK("CUSTOM_AGGREGATED_HEALTH_FAILURE:");
 
     String prefix;
 
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
index 06015dd50..5a5fec04b 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -20,12 +20,15 @@ package org.apache.helix.rest.client;
  */
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.StatusLine;
@@ -204,4 +207,31 @@ public class TestCustomRestClient {
       return new ObjectMapper().readTree(_jsonResponse);
     }
   }
+
+  @Test
+  public void testGetAggregatedStoppableCheck() throws IOException {
+    MockCustomRestClient customRestClient = new 
MockCustomRestClient(_httpClient);
+    String jsonResponse = "{\n \"stoppableInstances\" : [\"n1\", \"n2\", 
\"n3\"],\n \"nonStoppableInstancesWithReasons\": "
+        + "{\n    \"n4\": \"ERROR_PARTITION STILL_BOOTSTRAPPING\",\n    
\"n10\": \"NOT_READY\"\n  }\n}";
+    String clusterId = "cluster1";
+    String[] instances = {"n1", "n2", "n3", "n4", "n10"};
+    String[] healthyInstances = {"n1", "n2", "n3"};
+    String[] nonStoppableInstances = {"n4", "n10"};
+
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    customRestClient.setJsonResponse(jsonResponse);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    Map<String, List<String>> clusterHealth = 
customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
+        ImmutableList.of("n1", "n2", "n3", "n4", "n10"),
+        ImmutableSet.of("n7", "n8"), clusterId, Collections.emptyMap());
+
+    
Assert.assertTrue(Arrays.stream(instances).allMatch(clusterHealth::containsKey));
+    Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> 
clusterHealth.get(instance).isEmpty()));
+    Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance 
-> clusterHealth.get(instance).isEmpty()));
+  }
 }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 6c207bd10..5801fa231 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -20,6 +20,7 @@ package org.apache.helix.rest.clusterMaintenanceService;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +56,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
@@ -414,4 +416,126 @@ public class TestMaintenanceManagementService {
     verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), 
any());
   }
 
+  @Test
+  public void testGetAggregatedStoppableCheckWithFailedInstances() throws 
IOException {
+    MockMaintenanceManagementService service =
+        new MockMaintenanceManagementService(_dataAccessorWrapper, 
_configAccessor,
+            _customRestClient, false, false, 
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+    Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+    List<String> instances = List.of(TEST_INSTANCE);
+    RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+    // Set the customized health URL to a fake address which contains exact URI
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, 
"http://fakeAddress:123/path";);
+    when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+    when(
+        _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(), 
anySet(), anyString(),
+            anyMap())).thenReturn(
+        ImmutableMap.of(TEST_INSTANCE, 
Collections.singletonList("FAILED_FAKE_CHECK")));
+
+    Map<String, StoppableCheck> result =
+        service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "", 
toBeStoppedInstances);
+
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertTrue(result.containsKey(TEST_INSTANCE));
+    Assert.assertFalse(result.get(TEST_INSTANCE).isStoppable());
+
+    // Since the cluster level check is not skipped, 
getAggregatedStoppableCheck should be called
+    verify(_customRestClient, times(1)).getAggregatedStoppableCheck(any(), 
any(), any(), any(),
+        any());
+    // Since the rest URI uses the exact URI, no instance/partition level 
check should be performed
+    verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(), 
any());
+    verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), 
any(), any());
+  }
+
+  @Test
+  public void testGetAggregatedStoppableCheckWithStoppableInstances() throws 
IOException {
+    MockMaintenanceManagementService service =
+        new MockMaintenanceManagementService(_dataAccessorWrapper, 
_configAccessor,
+            _customRestClient, false, false, 
HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+
+    Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+    List<String> instances = new ArrayList<>(Arrays.asList("n1", "n3", "n5"));
+    RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+    // Set the customized health URL to a fake address which contains exact URI
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, 
"http://fakeAddress:123/path";);
+
+    Map<String, List<String>> stoppableCheckResult = new HashMap<>();
+    stoppableCheckResult.put("n1", Collections.emptyList()); // n1 is stoppable
+    stoppableCheckResult.put("n3",
+        List.of("FAILED_FAKE_CHECK", "FAILED_FAKE_CHECK2")); // n3 is not 
stoppable
+    stoppableCheckResult.put("n5", Collections.emptyList()); // n5 is stoppable
+    when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+    when(
+        _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(), 
anySet(), anyString(),
+            anyMap())).thenReturn(stoppableCheckResult);
+
+    Map<String, StoppableCheck> result =
+        service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "", 
toBeStoppedInstances);
+
+    Assert.assertEquals(result.size(), 3);
+    instances.forEach(instance -> {
+      Assert.assertTrue(result.containsKey(instance));
+      if (instance.equals("n1") || instance.equals("n5")) {
+        Assert.assertTrue(result.get(instance).isStoppable());
+      } else {
+        Assert.assertFalse(result.get(instance).isStoppable());
+        Assert.assertEquals(result.get(instance).getFailedChecks().size(), 2);
+      }
+    });
+
+    // Since the cluster level check is not skipped, 
getAggregatedStoppableCheck should be called
+    verify(_customRestClient, times(1)).getAggregatedStoppableCheck(any(), 
any(), any(), any(),
+        any());
+    // Since the rest URI uses the exact URI, no instance/partition level 
check should be performed
+    verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(), 
any());
+    verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), 
any(), any());
+  }
+
+  @Test
+  public void testNewRestConfigBackwardCompatibilityWithRestURIWithWildcard() 
throws IOException {
+    MockMaintenanceManagementService service =
+        new MockMaintenanceManagementService(_dataAccessorWrapper, 
_configAccessor,
+            _customRestClient, false, false,
+            Set.of(StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK),
+            HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+    Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+    List<String> instances = List.of(TEST_INSTANCE);
+    RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, 
"http://fakeAddress:123/path";);
+    when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+    when(
+        _customRestClient.getAggregatedStoppableCheck(anyString(), anyList(), 
anySet(), anyString(),
+            anyMap())).thenReturn(
+        ImmutableMap.of(TEST_INSTANCE, 
Collections.singletonList("FAILED_FAKE_CHECK")));
+
+    service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "", 
toBeStoppedInstances);
+    // Since the cluster level check is skipped, getAggregatedStoppableCheck 
should not be called
+    verify(_customRestClient, times(0)).getAggregatedStoppableCheck(any(), 
any(), any(), any(),
+        any());
+    verify(_customRestClient, times(0)).getInstanceStoppableCheck(any(), 
any());
+    verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), 
any(), any());
+  }
+
+  @Test
+  public void testRestConfigWorkWithInstancesAndPartitionChecks() throws 
IOException {
+    MockMaintenanceManagementService service =
+        new MockMaintenanceManagementService(_dataAccessorWrapper, 
_configAccessor,
+            _customRestClient, false, false, 
Set.of(StoppableCheck.Category.CUSTOM_PARTITION_CHECK),
+            HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
+    Set<String> toBeStoppedInstances = ImmutableSet.of("n2", "n4");
+    List<String> instances = List.of(TEST_INSTANCE);
+    RESTConfig restConfig = new RESTConfig(TEST_CLUSTER);
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, 
"http://*:123/path";);
+    when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+    when(_customRestClient.getInstanceStoppableCheck(anyString(), 
anyMap())).thenReturn(
+        ImmutableMap.of(TEST_INSTANCE, true));
+
+    service.batchGetInstancesStoppableChecks(TEST_CLUSTER, instances, "", 
toBeStoppedInstances);
+    // Since the cluster level check is skipped, getAggregatedStoppableCheck 
should not be called
+    verify(_customRestClient, times(0)).getAggregatedStoppableCheck(any(), 
any(), any(), any(),
+        any());
+    verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), 
any());
+    verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), 
any(), any());
+  }
 }
\ No newline at end of file
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 61ceef3a0..e1bbe7869 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
 import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
@@ -577,6 +578,61 @@ public class TestInstancesAccessor extends 
AbstractTestClass {
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testMultipleReplicasInSameMZ")
+  public void testSkipClusterLevelHealthCheck() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", 
\"%s\",\"%s\", \"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1", 
"instance3",
+        "instance6", "instance9", "instance10", "instance11", "instance12", 
"instance13",
+        "instance14", "invalidInstance");
+
+    // Change instance config of instance1 & instance0 to be evacuating
+    String instance0 = "instance0";
+    InstanceConfig instanceConfig =
+        _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance0);
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0, 
instanceConfig);
+    String instance1 = "instance1";
+    InstanceConfig instanceConfig1 =
+        _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1);
+    
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, 
instanceConfig1);
+    RESTConfig restConfig = new RESTConfig(STOPPABLE_CLUSTER2);
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, 
"http://localhost:1234";);
+    _configAccessor.setRESTConfig(STOPPABLE_CLUSTER2, restConfig);
+    // It takes time to reflect the changes.
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    Response response = new JerseyUriRequestBuilder(
+        
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_AGGREGATED_CHECK").format(
+        STOPPABLE_CLUSTER2).post(this, Entity.entity(content, 
MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = 
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance12") && 
stoppableSet.contains("instance11")
+        && stoppableSet.contains("instance10"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0, 
instanceConfig);
+    
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, 
instanceConfig1);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
   private Set<String> getStringSet(JsonNode jsonNode, String key) {
     Set<String> result = new HashSet<>();
     jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));


Reply via email to