This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 97e7c76ab Add InstanceConfigs inputField to allow overriding the
existing InstanceConfigs for /partitionAssignment API.(#2586)
97e7c76ab is described below
commit 97e7c76ab97fc99c1f6f54b714d5ccebc6881e13
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Aug 11 09:23:15 2023 -0700
Add InstanceConfigs inputField to allow overriding the existing
InstanceConfigs for /partitionAssignment API.(#2586)
---
.../helix/ResourceAssignmentOptimizerAccessor.java | 21 +++++++-
.../TestResourceAssignmentOptimizerAccessor.java | 56 +++++++++++++++++-----
2 files changed, 64 insertions(+), 13 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index 932452379..80386e008 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -21,7 +21,6 @@ package org.apache.helix.rest.server.resources.helix;
import java.security.InvalidParameterException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -41,6 +40,7 @@ import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
@@ -73,7 +73,8 @@ public class ResourceAssignmentOptimizerAccessor extends
AbstractHelixResource {
private static class InputFields {
Set<String> activatedInstances = new HashSet<>(); // active = online +
enabled.
- Set<String> deactivatedInstances = new HashSet<>(); // deactive = offline
+ disabled.
+ Set<String> deactivatedInstances = new HashSet<>(); // deactivate =
offline + disabled.
+ Set<String> instanceConfigs = new HashSet<>(); // instance configs to be
overriden.
Set<String> instanceFilter = new HashSet<>();
Set<String> resourceFilter = new HashSet<>();
AssignmentFormat returnFormat = AssignmentFormat.IdealStateFormat;
@@ -104,6 +105,8 @@ public class ResourceAssignmentOptimizerAccessor extends
AbstractHelixResource {
}
private static class InstanceChangeMap {
+ @JsonProperty("InstanceConfigs")
+ JsonNode instanceConfigs;
@JsonProperty("ActivateInstances")
List<String> activateInstances;
@JsonProperty("DeactivateInstances")
@@ -170,6 +173,9 @@ public class ResourceAssignmentOptimizerAccessor extends
AbstractHelixResource {
.ifPresent(inputFields.activatedInstances::addAll);
Optional.ofNullable(inputJsonContent.instanceChangeMap.deactivateInstances)
.ifPresent(inputFields.deactivatedInstances::addAll);
+
Optional.ofNullable(inputJsonContent.instanceChangeMap.instanceConfigs).ifPresent(
+ configs -> configs.forEach(
+ instanceConfig ->
inputFields.instanceConfigs.add(instanceConfig.toString())));
}
if (inputJsonContent.optionsMap != null) {
Optional.ofNullable(inputJsonContent.optionsMap.resourceFilter)
@@ -204,6 +210,17 @@ public class ResourceAssignmentOptimizerAccessor extends
AbstractHelixResource {
Map<String, InstanceConfig> instanceConfigMap =
dataAccessor.getChildValuesMap(dataAccessor.keyBuilder().instanceConfigs(),
true);
+ // Override instance config with inputFields.instanceConfigs
+ for (String instanceConfig : inputFields.instanceConfigs) {
+ try {
+ InstanceConfig instanceConfigOverride = new
InstanceConfig(toZNRecord(instanceConfig));
+ instanceConfigMap.put(instanceConfigOverride.getInstanceName(),
instanceConfigOverride);
+ } catch (Exception e) {
+ throw new InvalidParameterException(
+ "instanceConfig: " + instanceConfig + "is not a valid
instanceConfig");
+ }
+ }
+
// Read instance and cluster config.
// Throw exception if there is no instanceConfig for activatedInstances
instance.
for (String instance : inputFields.activatedInstances) {
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index 17eabf048..63afe8a24 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -20,6 +20,7 @@ package org.apache.helix.rest.server;
*/
import java.io.IOException;
+import java.io.StringWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -42,7 +43,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
public class TestResourceAssignmentOptimizerAccessor extends AbstractTestClass
{
String cluster = "TestCluster_3";
@@ -231,8 +231,8 @@ public class TestResourceAssignmentOptimizerAccessor
extends AbstractTestClass {
for (String resource : resources) {
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
- idealState
-
.setRebalancerClassName("org.apache.helix.controller.rebalancer.waged.WagedRebalancer");
+ idealState.setRebalancerClassName(
+ "org.apache.helix.controller.rebalancer.waged.WagedRebalancer");
_gSetupTool.getClusterManagementTool().setResourceIdealState(cluster,
resource, idealState);
}
@@ -241,9 +241,10 @@ public class TestResourceAssignmentOptimizerAccessor
extends AbstractTestClass {
+ "\"], \"DeactivateInstances\" : [ \"" + toDeactivatedInstance + "\"]
}} ";
String body = post(urlBase, null, Entity.entity(payload,
MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(), true).readEntity(String.class);
- Map<String, Map<String, Map<String, String>>> resourceAssignments =
OBJECT_MAPPER
- .readValue(body, new TypeReference<HashMap<String, Map<String,
Map<String, String>>>>() {
- });
+ Map<String, Map<String, Map<String, String>>> resourceAssignments =
+ OBJECT_MAPPER.readValue(body,
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
Set<String> hostSet = new HashSet<>();
resourceAssignments.forEach((k, v) -> v.forEach((kk, vv) ->
hostSet.addAll(vv.keySet())));
// Assert every partition has 2 replicas. Indicating we ignore the delayed
rebalance when
@@ -252,14 +253,47 @@ public class TestResourceAssignmentOptimizerAccessor
extends AbstractTestClass {
Assert.assertTrue(hostSet.contains(toEnabledInstance));
Assert.assertFalse(hostSet.contains(toDeactivatedInstance));
+ // Test InstanceConfig overrides
+ InstanceConfig toDeactivatedInstanceConfig =
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster,
toDeactivatedInstance);
+ InstanceConfig toEnabledInstanceConfig =
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster,
toEnabledInstance);
+ // Another way to mark the node as inactive or active.
+ toDeactivatedInstanceConfig.setInstanceEnabled(false);
+ toEnabledInstanceConfig.setInstanceEnabled(true);
+ // Write the current InstanceConfigs record to json string
+ StringWriter sw = new StringWriter();
+ OBJECT_MAPPER.writeValue(sw, toDeactivatedInstanceConfig.getRecord());
+ String toDeactivatedInstanceConfigStr = sw.toString();
+ sw = new StringWriter();
+ OBJECT_MAPPER.writeValue(sw, toEnabledInstanceConfig.getRecord());
+ String toEnabledInstanceConfigStr = sw.toString();
+ String payload1 =
+ "{\"InstanceChange\" : { " + "\"InstanceConfigs\": [" +
toDeactivatedInstanceConfigStr + ","
+ + toEnabledInstanceConfigStr + "]}}";
+ String body1 = post(urlBase, null, Entity.entity(payload1,
MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true).readEntity(String.class);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments1 =
+ OBJECT_MAPPER.readValue(body1,
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
+ Set<String> hostSet1 = new HashSet<>();
+ resourceAssignments1.forEach((k, v) -> v.forEach((kk, vv) ->
hostSet1.addAll(vv.keySet())));
+ // Assert every partition has 2 replicas.
+ resourceAssignments1.forEach(
+ (k, v) -> v.forEach((kk, vv) -> Assert.assertEquals(vv.size(), 2)));
+ Assert.assertTrue(hostSet1.contains(toEnabledInstance));
+ Assert.assertFalse(hostSet1.contains(toDeactivatedInstance));
+
// Test partitionAssignment host filter
String payload2 = "{\"Options\" : { \"InstanceFilter\" : [\"" +
liveInstances.get(0) + "\" , \""
+ liveInstances.get(1) + "\"] }} ";
String body2 = post(urlBase, null, Entity.entity(payload2,
MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(), true).readEntity(String.class);
- Map<String, Map<String, Map<String, String>>> resourceAssignments2 =
OBJECT_MAPPER
- .readValue(body2, new TypeReference<HashMap<String, Map<String,
Map<String, String>>>>() {
- });
+ Map<String, Map<String, Map<String, String>>> resourceAssignments2 =
+ OBJECT_MAPPER.readValue(body2,
+ new TypeReference<HashMap<String, Map<String, Map<String,
String>>>>() {
+ });
Set<String> hostSet2 = new HashSet<>();
resourceAssignments2.forEach((k, v) -> v.forEach((kk, vv) ->
hostSet2.addAll(vv.keySet())));
Assert.assertEquals(hostSet2.size(), 2);
@@ -267,8 +301,8 @@ public class TestResourceAssignmentOptimizerAccessor
extends AbstractTestClass {
Assert.assertTrue(hostSet2.contains(liveInstances.get(1)));
String payload3 =
- "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" ,
\"" + resources
- .get(1) + "\"] }} ";
+ "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" ,
\"" + resources.get(
+ 1) + "\"] }} ";
String body3 = post(urlBase, null, Entity.entity(payload3,
MediaType.APPLICATION_JSON_TYPE),
Response.Status.OK.getStatusCode(), true).readEntity(String.class);
Map<String, Map<String, Map<String, String>>> resourceAssignments3 =
OBJECT_MAPPER