This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 989c20c4beb13b8ca8f46e6fa7eab8e6fa64690a Author: xyuanlu <[email protected]> AuthorDate: Tue Aug 15 17:18:58 2023 -0700 Add API for Instance evacuation (#2588) --- .../apache/helix/constants/InstanceConstants.java | 8 +++++ .../src/main/java/org/apache/helix/HelixAdmin.java | 3 ++ .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 34 ++++++++++++++++++++++ .../org/apache/helix/model/InstanceConfig.java | 14 ++++++++- .../java/org/apache/helix/mock/MockHelixAdmin.java | 10 +++++-- .../rest/server/resources/AbstractResource.java | 3 +- .../server/resources/helix/InstancesAccessor.java | 1 + .../resources/helix/PerInstanceAccessor.java | 4 +++ .../helix/rest/server/TestPerInstanceAccessor.java | 11 +++++++ 9 files changed, 83 insertions(+), 5 deletions(-) diff --git a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java index 5bacef7a9..379bbaf02 100644 --- a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java +++ b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java @@ -8,4 +8,12 @@ public class InstanceConstants { USER_OPERATION, DEFAULT_INSTANCE_DISABLE_TYPE } + + public enum InstanceOperation { + EVACUATE, // Node will be removed after a period of time + SWAP_IN, // New node joining for swap operation + SWAP_OUT, // Existing Node to be removed for swap operation + ENABLE, // Backward compatible field for HELIX_ENABLED. Set when changing from disabled to enabled. + DISABLE // Backward compatible field for HELIX_ENABLED. Set when changing from enabled to disabled. + } } diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 6c6d4be5c..ab4ba57b6 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -302,6 +302,9 @@ public interface HelixAdmin { */ void enableInstance(String clusterName, List<String> instances, boolean enabled); + void setInstanceOperation(String clusterName, String instance, + InstanceConstants.InstanceOperation instanceOperation); + /** * Disable or enable a resource * @param clusterName diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 63986e6b5..d45f8cdb8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -374,6 +374,40 @@ public class ZKHelixAdmin implements HelixAdmin { //enableInstance(clusterName, instances, enabled, null, null); } + @Override + // TODO: Name may change in future + public void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation) { + + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException( + "Cluster " + clusterName + ", instance: " + instanceName + ", instance config does not exist"); + } + + boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException( + "Cluster: " + clusterName + ", instance: " + instanceName + ", participant config is null"); + } + + InstanceConfig config = new InstanceConfig(currentData); + // TODO: add sanity check in config.setInstanceOperation and throw exception when needed. + // TODO: Also instance enabled in instance config + config.setInstanceOperation(instanceOperation); + return config.getRecord(); + } + }, AccessOption.PERSISTENT); + + if (!succeeded) { + throw new HelixException("Failed to update instance operation. Please check if instance is disabled."); + } + } + @Override public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index 01c1f682f..bdd49cafb 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -61,7 +61,8 @@ public class InstanceConfig extends HelixProperty { DELAY_REBALANCE_ENABLED, MAX_CONCURRENT_TASK, INSTANCE_CAPACITY_MAP, - TARGET_TASK_THREAD_POOL_SIZE + TARGET_TASK_THREAD_POOL_SIZE, + INSTANCE_OPERATION } public static final int WEIGHT_NOT_SET = -1; @@ -329,6 +330,17 @@ public class InstanceConfig extends HelixProperty { return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1); } + public void setInstanceOperation(InstanceConstants.InstanceOperation operation) { + // TODO: also setInstanceEnabled after sanity check. + + _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), + operation.name()); + } + + public String getInstanceOperation() { + return _record.getStringField(InstanceConfigProperty.INSTANCE_OPERATION.name(), ""); + } + /** * Check if this instance is enabled for a given partition * This API is deprecated, and will be removed in next major release. diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index ec25d42be..0069deac8 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -23,8 +23,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.TreeMap; +import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -50,8 +50,7 @@ import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.zookeeper.datamodel.ZNRecord; - -import static org.apache.helix.manager.zk.ZKHelixAdmin.assembleInstanceBatchedDisabledInfo; +import org.apache.helix.zookeeper.zkclient.DataUpdater; public class MockHelixAdmin implements HelixAdmin { @@ -306,6 +305,11 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override + public void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation) { + } + @Override public void enableResource(String clusterName, String resourceName, boolean enabled) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index b3ca029c5..76733b92a 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -84,7 +84,8 @@ public class AbstractResource { enableWagedRebalanceForAllResources, purgeOfflineParticipants, getInstance, - getAllInstances + getAllInstances, + setInstanceOperation // TODO: Name is just a place holder, may change in future } @Context diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index d4f8f90af..d397ada0f 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -44,6 +44,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index d77e74355..8fcd2ab35 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -370,6 +370,7 @@ public class PerInstanceAccessor extends AbstractHelixResource { @POST public Response updateInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String command, + @QueryParam("instanceOperation") InstanceConstants.InstanceOperation state, @QueryParam("instanceDisabledType") String disabledType, @QueryParam("instanceDisabledReason") String disabledReason, String content) { Command cmd; @@ -414,6 +415,9 @@ public class PerInstanceAccessor extends AbstractHelixResource { OBJECT_MAPPER.getTypeFactory() .constructCollectionType(List.class, String.class))); break; + case setInstanceOperation: + admin.setInstanceOperation(clusterId, instanceName, state); + break; case addInstanceTag: if (!validInstance(node, instanceName)) { return badRequest("Instance names are not match!"); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 5d2fc7082..fe4e19906 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -486,6 +486,17 @@ public class TestPerInstanceAccessor extends AbstractTestClass { Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap() .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))), new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "3"))); + + // test set instance operation + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); + Assert.assertEquals( + instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString()); + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP") + .expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation") + .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); System.out.println("End test :" + TestHelper.getTestMethodName()); }
