This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f19304591 Allow partitionAssignment API in maintenance mode (#2742)
f19304591 is described below
commit f193045915fccb3c69a2e4d6601b8b575ac380af
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Fri Jan 26 18:35:13 2024 -0800
Allow partitionAssignment API in maintenance mode (#2742)
Allow partitionAssignment API in maintenance mode
---
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../helix/ResourceAssignmentOptimizerAccessor.java | 8 --
.../rest/server/TestPartitionAssignmentAPI.java | 113 +++++++++++++++++++++
.../TestResourceAssignmentOptimizerAccessor.java | 7 --
4 files changed, 116 insertions(+), 16 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 0861a48e8..36b4583ac 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -44,6 +44,7 @@ import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
@@ -376,7 +377,8 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
WagedRebalancer wagedRebalancer, ResourceControllerDataProvider cache,
CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap,
BestPossibleStateOutput output, List<String> failureResources) {
- if (cache.isMaintenanceModeEnabled()) {
+ // Allow calculation for readOnlyWagedRebalancer as it is used by
partitionAssignment API
+ if (cache.isMaintenanceModeEnabled() && !(wagedRebalancer instanceof
ReadOnlyWagedRebalancer)) {
// The WAGED rebalancer won't be used while maintenance mode is enabled.
return Collections.emptyMap();
}
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index 346c026dd..03465a9cd 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -347,14 +347,6 @@ public class ResourceAssignmentOptimizerAccessor extends
AbstractHelixResource {
InputFields inputFields, ClusterState clusterState, String clusterId,
AssignmentResult result) {
- // If the cluster is in Maintenance mode, throw an exception
- // TODO: we should return the partitionAssignment regardless of the
cluster is in Maintenance
- // mode or not
- if (getHelixAdmin().isInMaintenanceMode(clusterId)) {
- throw new UnsupportedOperationException(
- "Can not query potential Assignment when cluster is in Maintenance
mode.");
- }
-
// Use getTargetAssignmentForWagedFullAuto for Waged resources.
ConfigAccessor cfgAccessor = getConfigAccessor();
List<ResourceConfig> wagedResourceConfigs = new ArrayList<>();
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
index 87e490d62..01e88394d 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -35,6 +35,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -353,4 +355,115 @@ public class TestPartitionAssignmentAPI extends
AbstractTestClass {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
+
+ private void createCrushedResource(String db, int numPartition, int
minActiveReplica, long delay) {
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO + "", null);
+ _resources.add(db);
+
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ idealState.setMinActiveReplicas(minActiveReplica);
+ idealState.setDelayRebalanceEnabled(true);
+ idealState.setRebalanceDelay(delay);
+ idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ idealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
db, idealState);
+
+ ResourceConfig resourceConfig = new ResourceConfig(db);
+ _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+ _gSetupTool.rebalanceResource(CLUSTER_NAME, db, REPLICAS);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ @Test
+ private void testComputePartitionAssignmentMaintenanceMode() throws
Exception {
+
+ // Create 5 WAGED resources
+ String wagedResourcePrefix = "TEST_WAGED_DB_";
+ int wagedResourceCount = 5;
+ for (int i = 0; i < wagedResourceCount; i++) {
+ createWagedResource(wagedResourcePrefix + i,
+ DEFAULT_INSTANCE_CAPACITY * DEFAULT_INSTANCE_COUNT / REPLICAS /
wagedResourceCount,
+ MIN_ACTIVE_REPLICAS, 100000L);
+ }
+
+ String crushedResourcePrefix = "TEST_CRUSHED_DB_";
+ int crushedResourceCount = 3;
+ for (int i = 0; i < crushedResourceCount; i++) {
+ createCrushedResource(crushedResourcePrefix + i,
+ DEFAULT_INSTANCE_CAPACITY * DEFAULT_INSTANCE_COUNT / REPLICAS /
crushedResourceCount,
+ MIN_ACTIVE_REPLICA, 100000L);
+ }
+
+ // Wait for cluster to converge after adding resources
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Enter the cluster into MM
+ _gSetupTool.getClusterManagementTool()
+ .enableMaintenanceMode(CLUSTER_NAME, true,
+ "testComputePartitionAssignmentMaintenanceMode enters cluster into
MM.");
+
+ // Add instance to cluster as enabled
+ String toAddInstanceName = "dummyInstance";
+ InstanceConfig toAddInstanceConfig = new InstanceConfig(toAddInstanceName);
+ toAddInstanceConfig.setInstanceCapacityMap(
+ Collections.singletonMap(INSTANCE_CAPACITY_KEY,
DEFAULT_INSTANCE_CAPACITY));
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME,
toAddInstanceConfig);
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
toAddInstanceName, true);
+
+ // Actually create the live instance
+ MockParticipantManager toAddParticipant =
createParticipant(toAddInstanceName);
+ toAddParticipant.syncStart();
+
+ // Choose participant to simulate killing in API call
+ MockParticipantManager participantToKill = _participants.get(0);
+
+ // Use partition assignment API to simulate adding and killing separate
instances
+ // returns idealStates for all resources in cluster
+ String payload = "{\"InstanceChange\" : { \"ActivateInstances\" : [\"" +
toAddInstanceName
+ + "\"], \"DeactivateInstances\" : [\"" +
participantToKill.getInstanceName() + "\"] }, "
+ + "\"Options\" : { \"ReturnFormat\" : \"IdealStateFormat\" }}";
+ Response response = post(getPartitionAssignmentPath(), null,
+ Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(),
+ true);
+ String body = response.readEntity(String.class);
+ Map<String, Map<String, Map<String, String>>>
partitionAssignmentIdealStates =
+ OBJECT_MAPPER.readValue(body,
+ // Map of resources --> map of partitions --> MAP of instances -->
state
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
+
+ // Kill the instance we simulated in partitionAssignment API and wait for
delay window
+ participantToKill.syncStop();
+ Thread.sleep(3000L);
+
+ // Exit the cluster from MM
+ _gSetupTool.getClusterManagementTool()
+ .enableMaintenanceMode(CLUSTER_NAME, false,
+ "testComputePartitionAssignmentMaintenanceMode exits cluster out
of MM.");
+
+ // Wait for cluster to converge after exiting maintenance mode
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Assert that all resource idealStates calculated by the
partitionAssignment API during MM
+ // is identical to the idealStates calculated by the controller after it
exits MM.
+ Assert.assertTrue(TestHelper.verify(() -> {
+ try {
+ Map<String, Map<String, Map<String, String>>> idealStatesMap = new
HashMap<>();
+
+
+ for (String resource : _resources) {
+ IdealState idealState =
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(resource));
+ idealStatesMap.put(resource, idealState.getRecord().getMapFields());
+ }
+ Assert.assertEquals(partitionAssignmentIdealStates, idealStatesMap);
+ } catch (AssertionError e) {
+ LOG.error("Ideal state does not match partition assignment", e);
+ return false;
+ }
+ return true;
+ }, 30000));
+ }
}
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index 63afe8a24..ad4142c67 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -329,13 +329,6 @@ public class TestResourceAssignmentOptimizerAccessor
extends AbstractTestClass {
post(urlBase, null, Entity.entity(payload5,
MediaType.APPLICATION_JSON_TYPE),
Response.Status.BAD_REQUEST.getStatusCode(), true);
- // Currently we do not support maintenance mode
- _gSetupTool.getClusterManagementTool()
- .enableMaintenanceMode(cluster, true, TestHelper.getTestMethodName());
- String payload6 = "{}";
- post(urlBase, null, Entity.entity(payload6,
MediaType.APPLICATION_JSON_TYPE),
- Response.Status.BAD_REQUEST.getStatusCode(), true);
-
System.out.println("End test :" + TestHelper.getTestMethodName());
}
}
\ No newline at end of file