This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 33a28e761 Support Stoppable Check for Non-Topology-Aware Clusters 
(#2961)
33a28e761 is described below

commit 33a28e761b9dd8d1ac790121a0dcae55f30e07f4
Author: Xiaxuan Gao <xiax...@linkedin.com>
AuthorDate: Wed Dec 11 14:31:23 2024 -0800

    Support Stoppable Check for Non-Topology-Aware Clusters (#2961)
    
    Support Stoppable Check for Non-Topology-Aware Clusters
---
 .../helix/rest/client/CustomRestClientImpl.java    |  11 +-
 .../StoppableInstancesSelector.java                |  59 +++++++--
 .../server/resources/helix/InstancesAccessor.java  |  55 +++++++--
 .../helix/rest/server/service/ClusterService.java  |   7 ++
 .../rest/server/service/ClusterServiceImpl.java    |   9 ++
 .../helix/rest/client/TestCustomRestClient.java    |  39 ++++++
 .../helix/rest/server/AbstractTestClass.java       |  62 ++++++++++
 .../helix/rest/server/TestInstancesAccessor.java   | 132 +++++++++++++++++++++
 .../rest/server/service/TestClusterService.java    |  24 ++++
 9 files changed, 376 insertions(+), 22 deletions(-)

diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index db943b3dd..b09116ac9 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -132,8 +132,15 @@ class CustomRestClientImpl implements CustomRestClient {
     if (instances != null && !instances.isEmpty()) {
       payLoads.put("instances", instances);
     }
-    if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
-      payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
+    // Before sending the request, make sure the toBeStoppedInstances has no 
overlap with instances
+    Set<String> remainingToBeStoppedInstances = toBeStoppedInstances;
+    if (instances != null && toBeStoppedInstances != null) {
+      remainingToBeStoppedInstances =
+          toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins))
+              .collect(Collectors.toSet());
+    }
+    if (remainingToBeStoppedInstances != null && 
!remainingToBeStoppedInstances.isEmpty()) {
+      payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances);
     }
     if (clusterId != null) {
       payLoads.put("cluster_id", clusterId);
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index bb5a2bc5c..891699100 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -86,8 +86,7 @@ public class StoppableInstancesSelector {
         
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
     ObjectNode failedStoppableInstances = result.putObject(
         
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
-    Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
-    collectEvacuatingInstances(toBeStoppedInstancesSet);
+    Set<String> toBeStoppedInstancesSet = 
findToBeStoppedInstances(toBeStoppedInstances);
 
     List<String> zoneBasedInstance =
         getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
@@ -118,8 +117,7 @@ public class StoppableInstancesSelector {
         
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
     ObjectNode failedStoppableInstances = result.putObject(
         
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
-    Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
-    collectEvacuatingInstances(toBeStoppedInstancesSet);
+    Set<String> toBeStoppedInstancesSet = 
findToBeStoppedInstances(toBeStoppedInstances);
 
     Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
     for (String zone : _orderOfZone) {
@@ -136,6 +134,39 @@ public class StoppableInstancesSelector {
     return result;
   }
 
+  /**
+   * Evaluates and collects stoppable instances not based on the zone order.
+   * The method iterates through instances, performing stoppable checks, and 
records reasons for
+   * non-stoppability.
+   *
+   * @param instances A list of instance to be evaluated.
+   * @param toBeStoppedInstances A list of instances presumed to be already 
stopped
+   * @return An ObjectNode containing:
+   *         - 'stoppableNode': List of instances that can be stopped.
+   *         - 'instance_not_stoppable_with_reasons': A map with the instance 
name as the key and
+   *         a list of reasons for non-stoppability as the value.
+   * @throws IOException
+   */
+  public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
+      List<String> toBeStoppedInstances) throws IOException {
+    ObjectNode result = JsonNodeFactory.instance.objectNode();
+    ArrayNode stoppableInstances =
+        
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    ObjectNode failedStoppableInstances = result.putObject(
+        
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Set<String> toBeStoppedInstancesSet = 
findToBeStoppedInstances(toBeStoppedInstances);
+
+    // Because zone order calculation is omitted, we must verify each 
instance's existence
+    // to ensure we only process valid instances before performing stoppable 
check.
+    Set<String> nonExistingInstances = processNonexistentInstances(instances, 
failedStoppableInstances);
+    List<String> instancesToCheck = new ArrayList<>(instances);
+    instancesToCheck.removeAll(nonExistingInstances);
+    populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, 
stoppableInstances,
+        failedStoppableInstances);
+
+    return result;
+  }
+
   private void populateStoppableInstances(List<String> instances, Set<String> 
toBeStoppedInstances,
       ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) 
throws IOException {
     Map<String, StoppableCheck> instancesStoppableChecks =
@@ -159,7 +190,7 @@ public class StoppableInstancesSelector {
     }
   }
 
-  private void processNonexistentInstances(List<String> instances, ObjectNode 
failedStoppableInstances) {
+  private Set<String> processNonexistentInstances(List<String> instances, 
ObjectNode failedStoppableInstances) {
     // Adding following logic to check whether instances exist or not. An 
instance exist could be
     // checking following scenario:
     // 1. Instance got dropped. (InstanceConfig is gone.)
@@ -174,6 +205,7 @@ public class StoppableInstancesSelector {
       ArrayNode failedReasonsNode = 
failedStoppableInstances.putArray(nonSelectedInstance);
       
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
     }
+    return nonSelectedInstances;
   }
 
   /**
@@ -258,21 +290,26 @@ public class StoppableInstancesSelector {
   }
 
   /**
-   * Collect instances marked for evacuation in the current topology and add 
them into the given set
+   * Collect instances within the cluster where the instance operation is set 
to EVACUATE, SWAP_IN, or UNKNOWN.
+   * And return them as a set.
    *
-   * @param toBeStoppedInstances A set of instances we presume to be stopped.
+   * @param toBeStoppedInstances A list of instances we presume to be stopped.
    */
-  private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
+  private Set<String> findToBeStoppedInstances(List<String> 
toBeStoppedInstances) {
+    Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
     Set<String> allInstances = _clusterTopology.getAllInstances();
     for (String instance : allInstances) {
       PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
       InstanceConfig instanceConfig =
           
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
-      if (InstanceConstants.InstanceOperation.EVACUATE.equals(
-          instanceConfig.getInstanceOperation().getOperation())) {
-        toBeStoppedInstances.add(instance);
+      InstanceConstants.InstanceOperation operation = 
instanceConfig.getInstanceOperation().getOperation();
+      if (operation == InstanceConstants.InstanceOperation.EVACUATE
+          || operation == InstanceConstants.InstanceOperation.SWAP_IN
+          || operation == InstanceConstants.InstanceOperation.UNKNOWN) {
+        toBeStoppedInstancesSet.add(instance);
       }
     }
+    return toBeStoppedInstancesSet;
   }
 
   public static class StoppableInstancesSelectorBuilder {
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index d24ad9fce..52313a66b 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -82,7 +82,7 @@ public class InstancesAccessor extends AbstractHelixResource {
   }
 
   public enum InstanceHealthSelectionBase {
-    instance_based,
+    non_zone_based,
     zone_based,
     cross_zone_based
   }
@@ -224,12 +224,17 @@ public class InstancesAccessor extends 
AbstractHelixResource {
       boolean random) throws IOException {
     try {
       // TODO: Process input data from the content
+      // TODO: Implement the logic to automatically detect the selection base. 
https://github.com/apache/helix/issues/2968#issue-2691677799
       InstancesAccessor.InstanceHealthSelectionBase selectionBase =
-          InstancesAccessor.InstanceHealthSelectionBase.valueOf(
+          
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
+              ? InstanceHealthSelectionBase.non_zone_based : 
InstanceHealthSelectionBase.valueOf(
               
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());
+
       List<String> instances = OBJECT_MAPPER.readValue(
           
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
           OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, 
String.class));
+      ClusterService clusterService =
+          new ClusterServiceImpl(getDataAccssor(clusterId), 
getConfigAccessor());
 
       List<String> orderOfZone = null;
       String customizedInput = null;
@@ -252,6 +257,12 @@ public class InstancesAccessor extends 
AbstractHelixResource {
           _logger.error(message);
           return badRequest(message);
         }
+        if (!orderOfZone.isEmpty() && selectionBase == 
InstanceHealthSelectionBase.non_zone_based) {
+          String message =
+              "'zone_order' is set but 'selection_base' is 'non_zone_based'. 
Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
+          _logger.error(message);
+          return badRequest(message);
+        }
       }
 
       if 
(node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) 
!= null) {
@@ -285,6 +296,33 @@ public class InstancesAccessor extends 
AbstractHelixResource {
         }
       }
 
+      ClusterTopology clusterTopology = 
clusterService.getClusterTopology(clusterId);
+      if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
+        if (!clusterService.isClusterTopologyAware(clusterId)) {
+          String message = "Cluster " + clusterId
+              + " is not topology aware. Please enable the topology in cluster 
config or set "
+              + "'selection_base' to 'non_zone_based'.";
+          _logger.error(message);
+          return badRequest(message);
+        }
+
+        // Find instances that lack topology information
+        Set<String> instancesWithTopology =
+            clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry 
-> entry.getValue().stream())
+                .collect(Collectors.toSet());
+        Set<String> allInstances = clusterTopology.getAllInstances();
+        Set<String> topologyUnawareInstances = new 
HashSet<>(instances).stream().filter(
+                instance -> !instancesWithTopology.contains(instance) && 
allInstances.contains(instance))
+            .collect(Collectors.toSet());
+        if (!topologyUnawareInstances.isEmpty()) {
+          String message = "Instances " + topologyUnawareInstances
+              + " do not have topology information. Please set topology 
information in instance config or"
+              + " set 'selection_base' to 'non_zone_based'.";
+          _logger.error(message);
+          return badRequest(message);
+        }
+      }
+
       String namespace = getNamespace();
       MaintenanceManagementService maintenanceService =
           new 
MaintenanceManagementService.MaintenanceManagementServiceBuilder()
@@ -299,9 +337,6 @@ public class InstancesAccessor extends 
AbstractHelixResource {
               .setSkipStoppableHealthCheckList(skipStoppableCheckList)
               .build();
 
-      ClusterService clusterService =
-          new ClusterServiceImpl(getDataAccssor(clusterId), 
getConfigAccessor());
-      ClusterTopology clusterTopology = 
clusterService.getClusterTopology(clusterId);
       StoppableInstancesSelector stoppableInstancesSelector =
           new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
               .setClusterId(clusterId)
@@ -311,18 +346,20 @@ public class InstancesAccessor extends 
AbstractHelixResource {
               .setClusterTopology(clusterTopology)
               .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
               .build();
-      stoppableInstancesSelector.calculateOrderOfZone(instances, random);
       ObjectNode result;
-      // TODO: Add support for clusters that do not have topology set up.
-      // Issue #2893: https://github.com/apache/helix/issues/2893
+
       switch (selectionBase) {
         case zone_based:
+          stoppableInstancesSelector.calculateOrderOfZone(instances, random);
           result = 
stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, 
toBeStoppedInstances);
           break;
         case cross_zone_based:
+          stoppableInstancesSelector.calculateOrderOfZone(instances, random);
           result = 
stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, 
toBeStoppedInstances);
           break;
-        case instance_based:
+        case non_zone_based:
+          result = 
stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, 
toBeStoppedInstances);
+          break;
         default:
           throw new UnsupportedOperationException("instance_based selection is 
not supported yet!");
       }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
index d789e3615..db93571e8 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java
@@ -41,4 +41,11 @@ public interface ClusterService {
    * @return
    */
   ClusterInfo getClusterInfo(String clusterId);
+
+  /**
+   * Check if the cluster is topology aware
+   * @param clusterId
+   * @return
+   */
+  boolean isClusterTopologyAware(String clusterId);
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
index b4667fb13..a152c3e64 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import io.netty.util.internal.StringUtil;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.rest.server.json.cluster.ClusterInfo;
@@ -102,4 +104,11 @@ public class ClusterServiceImpl implements ClusterService {
         .instances(_dataAccessor.getChildNames(keyBuilder.instances()))
         
.liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build();
   }
+
+  @Override
+  public boolean isClusterTopologyAware(String clusterId) {
+    ClusterConfig config = _configAccessor.getClusterConfig(clusterId);
+    return config.isTopologyAwareEnabled() && 
!StringUtil.isNullOrEmpty(config.getFaultZoneType())
+        && !StringUtil.isNullOrEmpty(config.getTopology());
+  }
 }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
index 3d81dc432..6c8f4be40 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -40,14 +40,18 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.junit.Assert;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestCustomRestClient {
@@ -260,4 +264,39 @@ public class TestCustomRestClient {
     Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> 
clusterHealth.get(instance).isEmpty()));
     Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance 
-> clusterHealth.get(instance).isEmpty()));
   }
+
+  @Test(description = "Test if the aggregated stoppable check request has the 
correct format when there"
+      + "are duplicate instances in the instances list and the 
toBeStoppedInstances list.")
+  public void testAggregatedCheckRemoveDuplicateInstances()
+      throws IOException {
+    String clusterId = "cluster1";
+
+    MockCustomRestClient customRestClient = new 
MockCustomRestClient(_httpClient);
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
+        ImmutableList.of("n1", "n2"),
+        ImmutableSet.of("n1"), clusterId, Collections.emptyMap());
+
+    // Make sure that the duplicate instances are removed from the 
toBeStoppedInstances list
+    ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    verify(_httpClient).execute(argThat(x -> {
+      String request = null;
+      try {
+        request = EntityUtils.toString(((HttpPost) x).getEntity());
+        JsonNode node = OBJECT_MAPPER.readTree(request);
+        String instancesInRequest = node.get("instances").toString();
+        Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]");
+        Assert.assertNull(node.get("to_be_stopped_instances"));
+        return true;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }));
+  }
 }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java 
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index ad7c4482f..ee2346d2d 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -132,6 +132,7 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
   protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
   protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
   protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
+  protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3";
   protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
   protected static final List<String> STOPPABLE_INSTANCES =
       Arrays.asList("instance0", "instance1", "instance2", "instance3", 
"instance4", "instance5");
@@ -343,6 +344,7 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     }
     preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, 
STOPPABLE_INSTANCES);
     preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, 
STOPPABLE_INSTANCES2);
+    preSetupForNonTopoAwareInstancesStoppableTest(STOPPABLE_CLUSTER3, 
STOPPABLE_INSTANCES2);
   }
 
   protected Set<String> createInstances(String cluster, int numInstances) {
@@ -602,6 +604,8 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     _gSetupTool.addCluster(clusterName, true);
     ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
     clusterConfig.setFaultZoneType("helixZoneId");
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/helixZoneId/instance");
     clusterConfig.setPersistIntermediateAssignment(true);
     _configAccessor.setClusterConfig(clusterName, clusterConfig);
     // Create instance configs
@@ -659,6 +663,8 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     _gSetupTool.addCluster(clusterName, true);
     ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
     clusterConfig.setFaultZoneType("helixZoneId");
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setTopology("/helixZoneId/instance");
     clusterConfig.setPersistIntermediateAssignment(true);
     _configAccessor.setClusterConfig(clusterName, clusterConfig);
     // Create instance configs
@@ -711,6 +717,62 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     _clusters.add(clusterName);
     _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
   }
+
+  private void preSetupForNonTopoAwareInstancesStoppableTest(String 
clusterName,
+      List<String> instances) throws Exception {
+    _gSetupTool.addCluster(clusterName, true);
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setFaultZoneType("helixZoneId");
+    clusterConfig.setPersistIntermediateAssignment(true);
+    _configAccessor.setClusterConfig(clusterName, clusterConfig);
+    // Create instance configs that do not include the domain field
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    int perZoneInstancesCount = 3;
+    int curZoneCount = 0;
+    for (int i = 0; i < instances.size(); i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+      if (++curZoneCount >= perZoneInstancesCount) {
+        curZoneCount = 0;
+      }
+      instanceConfigs.add(instanceConfig);
+    }
+
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      _gSetupTool.getClusterManagementTool().addInstance(clusterName, 
instanceConfig);
+    }
+
+    // Start participant
+    startInstances(clusterName, new TreeSet<>(instances), instances.size());
+    createResources(clusterName, 1, 2, 3);
+    _clusterControllerManagers.add(startController(clusterName));
+
+    // Make sure that cluster config exists
+    boolean isClusterConfigExist = TestHelper.verify(() -> {
+      ClusterConfig stoppableClusterConfig;
+      try {
+        stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
+      } catch (Exception e) {
+        return false;
+      }
+      return (stoppableClusterConfig != null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isClusterConfigExist);
+    // Make sure that instance config exists for the instance0 to instance5
+    for (String instance: instances) {
+      boolean isinstanceConfigExist = TestHelper.verify(() -> {
+        InstanceConfig instanceConfig;
+        try {
+          instanceConfig = _configAccessor.getInstanceConfig(clusterName, 
instance);
+        } catch (Exception e) {
+          return false;
+        }
+        return (instanceConfig != null);
+      }, TestHelper.WAIT_DURATION);
+      Assert.assertTrue(isinstanceConfigExist);
+    }
+    _clusters.add(clusterName);
+    _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
+  }
   /**
    * Starts a HelixRestServer for the test suite.
    * @return
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index e1bbe7869..1c9cbcb51 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -30,6 +30,7 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -633,6 +634,137 @@ public class TestInstancesAccessor extends 
AbstractTestClass {
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
+  public void testNonTopoAwareStoppableCheck() throws JsonProcessingException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", 
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.non_zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1", 
"instance3",
+        "instance6", "instance9", "instance10", "instance11", "instance12", 
"instance13",
+        "instance14", "invalidInstance",
+        
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), 
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+    // Change instance config of instance1 & instance0 to be evacuating
+    String instance0 = "instance0";
+    InstanceConfig instanceConfig =
+        _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance0);
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, 
instanceConfig);
+    String instance1 = "instance1";
+    InstanceConfig instanceConfig1 =
+        _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance1);
+    
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, 
instanceConfig1);
+
+    // It takes time to reflect the changes.
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    Response response = new JerseyUriRequestBuilder(
+        
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER3).post(this, Entity.entity(content, 
MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = 
OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance12") && 
stoppableSet.contains("instance3")
+        && stoppableSet.contains("instance10"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+    
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, 
instanceConfig);
+    
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, 
instanceConfig1);
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck")
+  public void testNonTopoAwareStoppableCheckWithException() throws 
JsonProcessingException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", 
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1", 
"instance3",
+        "instance6", "instance9", "instance10", "instance11", "instance12", 
"instance13",
+        "instance14", "invalidInstance",
+        
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), 
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+    // It takes time to reflect the changes.
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Making the REST Call to cross zone stoppable check while the cluster 
has no topology aware
+    // setup. The call should return an error.
+    Response response = new JerseyUriRequestBuilder(
+        
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER3)
+        .isBodyReturnExpected(true)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
+        .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(description = "Test zone selection base with instance that don't have 
topology set in the config",
+   dependsOnMethods = "testNonTopoAwareStoppableCheckWithException")
+  public void testZoneSelectionBaseWithInstanceThatDontHaveTopologySet() {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", 
\"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1", 
"instance3",
+        "instance6", "instance9", "instance10", "instance11", "instance12", 
"instance13",
+        "instance14", "invalidInstance",
+        
InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), 
"INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE");
+
+    String instance1 = "instance1";
+    InstanceConfig instanceConfig1 =
+        _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1);
+    String domain = instanceConfig1.getDomainAsString();
+    instanceConfig1.setDomain("FALSE_DOMAIN");
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, 
instanceConfig1);
+
+    // It takes time to reflect the changes.
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Making the REST Call to cross zone stoppable check while the cluster 
has no topology aware
+    // setup. The call should return an error.
+    Response response = new JerseyUriRequestBuilder(
+        
"clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+            STOPPABLE_CLUSTER3)
+        .isBodyReturnExpected(true)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
+        .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+
+    // Restore the changes on instance 1
+    instanceConfig1.setDomain(domain);
+    _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, 
instanceConfig1);
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
   private Set<String> getStringSet(JsonNode jsonNode, String key) {
     Set<String> result = new HashSet<>();
     jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
index 5786350d9..23182d2bb 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java
@@ -96,6 +96,30 @@ public class TestClusterService {
     Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER);
   }
 
+  @Test
+  public void testCheckTopologyAware() {
+    Mock mock = new Mock();
+    
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+    ClusterConfig config = new ClusterConfig(TEST_CLUSTER);
+    config.setTopology("/zone");
+    
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+    
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+    config = new ClusterConfig(TEST_CLUSTER);
+    config.setFaultZoneType("zone");
+    config.setTopology("/zone");
+    
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+    
Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+
+    config = new ClusterConfig(TEST_CLUSTER);
+    config.setFaultZoneType("zone");
+    config.setTopology("/zone");
+    config.setTopologyAwareEnabled(true);
+    
when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config);
+    
Assert.assertTrue(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER));
+  }
+
   private final class Mock {
     private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
     private ConfigAccessor configAccessor = mock(ConfigAccessor.class);


Reply via email to