This is an automated email from the ASF dual-hosted git repository. hzlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit b52bc048ddd22ed9770540ff71024ecec5f59bc1 Author: Huizhi Lu <[email protected]> AuthorDate: Mon Jun 28 17:28:22 2021 -0700 Add REST APIs for management mode (#1807) This commit adds JAVA and REST APIs to set cluster freeze mode and get the management mode status --- .../src/main/java/org/apache/helix/HelixAdmin.java | 13 ++- .../helix/api/status/ClusterManagementMode.java | 6 ++ .../api/status/ClusterManagementModeRequest.java | 10 ++ .../helix/controller/GenericHelixController.java | 1 + .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 15 ++- .../java/org/apache/helix/mock/MockHelixAdmin.java | 6 ++ .../server/resources/helix/ClusterAccessor.java | 115 +++++++++++++++++++++ .../helix/rest/server/TestClusterAccessor.java | 70 +++++++++++++ 8 files changed, 230 insertions(+), 6 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index d403571..05bd60e 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.api.topology.ClusterTopology; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConstraints; @@ -37,8 +39,6 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.api.status.ClusterManagementMode; -import org.apache.helix.api.status.ClusterManagementModeRequest; /* * Helix cluster management @@ -382,6 +382,15 @@ public interface HelixAdmin { void setClusterManagementMode(ClusterManagementModeRequest request); /** + * Gets cluster management status {@link ClusterManagementMode}: what mode the cluster is and + * whether the cluster has fully reached to that mode. + * + * @param clusterName cluster name + * @return {@link ClusterManagementMode} + */ + ClusterManagementMode getClusterManagementMode(String clusterName); + + /** * Reset a list of partitions in error state for an instance * The partitions are assume to be in error state and reset will bring them from error * to initial state. An error to initial state transition is required for reset. diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java index d7a1637..cbd2019 100644 --- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java +++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java @@ -55,6 +55,12 @@ public class ClusterManagementMode { private final Type mode; private final Status status; + // Default constructor for json deserialization + private ClusterManagementMode() { + mode = null; + status = null; + } + public ClusterManagementMode(Type mode, Status status) { this.mode = mode; this.status = status; diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java index dd2fe58..f3d9b07 100644 --- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java +++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementModeRequest.java @@ -19,11 +19,14 @@ package org.apache.helix.api.status; * under the License. */ +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; import com.google.common.base.Preconditions; /** * Represents a request to set the cluster management mode {@link ClusterManagementMode} */ +@JsonDeserialize(builder = ClusterManagementModeRequest.Builder.class) public class ClusterManagementModeRequest { private final ClusterManagementMode.Type _mode; private final String _clusterName; @@ -57,6 +60,7 @@ public class ClusterManagementModeRequest { return new Builder(); } + @JsonPOJOBuilder(buildMethodName = "buildFromJson") public static final class Builder { private ClusterManagementMode.Type mode; private String clusterName; @@ -94,6 +98,12 @@ public class ClusterManagementModeRequest { return new ClusterManagementModeRequest(this); } + // Used by Json deserializer + private ClusterManagementModeRequest buildFromJson() { + Preconditions.checkNotNull(mode, "Mode not set"); + return new ClusterManagementModeRequest((this)); + } + private void validate() { Preconditions.checkNotNull(mode, "Mode not set"); Preconditions.checkNotNull(clusterName, "Cluster name not set"); diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 6cc1e5f..00bdfcc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -83,6 +83,7 @@ import org.apache.helix.controller.stages.ExternalViewComputeStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.MaintenanceRecoveryStage; import org.apache.helix.controller.stages.ManagementModeStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; import org.apache.helix.controller.stages.PersistAssignmentStage; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 2545139..73c699e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -54,6 +54,7 @@ import org.apache.helix.PropertyType; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.api.exceptions.HelixConflictException; import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.api.topology.ClusterTopology; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; @@ -62,6 +63,7 @@ import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; +import org.apache.helix.model.ClusterStatus; import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.ControllerHistory; import org.apache.helix.model.CurrentState; @@ -81,7 +83,6 @@ import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.HelixUtil; @@ -521,6 +522,15 @@ public class ZKHelixAdmin implements HelixAdmin { } } + @Override + public ClusterManagementMode getClusterManagementMode(String clusterName) { + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient)); + ClusterStatus status = accessor.getProperty(accessor.keyBuilder().clusterStatus()); + return status == null ? null + : new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus()); + } + private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) { String hostname = NetworkUtil.getLocalhostName(); logger.info( @@ -533,9 +543,6 @@ public class ZKHelixAdmin implements HelixAdmin { if (baseDataAccessor.exists(accessor.keyBuilder().pause().getPath(), AccessOption.PERSISTENT)) { throw new HelixConflictException(clusterName + " pause signal already exists"); } - if (baseDataAccessor.exists(accessor.keyBuilder().maintenance().getPath(), AccessOption.PERSISTENT)) { - throw new HelixConflictException(clusterName + " maintenance signal already exists"); - } // check whether cancellation is enabled ClusterConfig config = accessor.getProperty(accessor.keyBuilder().clusterConfig()); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 2bacc74..f7a1034 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -30,6 +30,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.PropertyType; +import org.apache.helix.api.status.ClusterManagementMode; import org.apache.helix.api.topology.ClusterTopology; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; @@ -331,6 +332,11 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override + public ClusterManagementMode getClusterManagementMode(String clusterName) { + return null; + } + @Override public void resetPartition(String clusterName, String instanceName, String resourceName, List<String> partitionNames) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java index 78010b9..9efcdc7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -22,8 +22,10 @@ package org.apache.helix.rest.server.resources.helix; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -36,17 +38,22 @@ import javax.ws.rs.core.Response; import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.api.exceptions.HelixConflictException; +import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.manager.zk.ZKUtil; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; @@ -66,6 +73,7 @@ import org.apache.helix.rest.server.service.ClusterServiceImpl; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,6 +308,113 @@ public class ClusterAccessor extends AbstractHelixResource { @ResponseMetered(name = HttpConstants.READ_REQUEST) @Timed(name = HttpConstants.READ_REQUEST) @GET + @Path("{clusterId}/management-mode") + public Response getClusterManagementMode(@PathParam("clusterId") String clusterId, + @QueryParam("showDetails") boolean showDetails) { + ClusterManagementMode mode = getHelixAdmin().getClusterManagementMode(clusterId); + if (mode == null) { + return notFound("Cluster " + clusterId + " is not in management mode"); + } + + Map<String, Object> responseMap = new HashMap<>(); + responseMap.put("cluster", clusterId); + responseMap.put("mode", mode.getMode()); + responseMap.put("status", mode.getStatus()); + if (showDetails) { + // To show details, query participants that are in progress to management mode. + responseMap.put("details", getManagementModeDetails(clusterId, mode)); + } + + return JSONRepresentation(responseMap); + } + + private Map<String, Object> getManagementModeDetails(String clusterId, + ClusterManagementMode mode) { + Map<String, Object> details = new HashMap<>(); + Map<String, Object> participantDetails = new HashMap<>(); + ClusterManagementMode.Status status = mode.getStatus(); + details.put("cluster", ImmutableMap.of("cluster", clusterId, "status", status.name())); + + boolean hasPendingST = false; + Set<String> liveInstancesInProgress = new HashSet<>(); + + if (ClusterManagementMode.Status.IN_PROGRESS.equals(status)) { + HelixDataAccessor accessor = getDataAccssor(clusterId); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances()); + BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor(); + + if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) { + // Entering cluster freeze mode, check live instance freeze status and pending ST + for (LiveInstance liveInstance : liveInstances) { + String instanceName = liveInstance.getInstanceName(); + if (!LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) { + liveInstancesInProgress.add(instanceName); + } + Stat stat = baseAccessor + .getStat(keyBuilder.messages(instanceName).getPath(), AccessOption.PERSISTENT); + if (stat.getNumChildren() > 0) { + hasPendingST = true; + liveInstancesInProgress.add(instanceName); + } + } + } else if (ClusterManagementMode.Type.NORMAL.equals(mode.getMode())) { + // Exiting freeze mode, check live instance unfreeze status + for (LiveInstance liveInstance : liveInstances) { + if (LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstance.getStatus())) { + liveInstancesInProgress.add(liveInstance.getInstanceName()); + } + } + } + } + + participantDetails.put("status", status.name()); + participantDetails.put("liveInstancesInProgress", liveInstancesInProgress); + if (ClusterManagementMode.Type.CLUSTER_PAUSE.equals(mode.getMode())) { + // Add pending ST result for cluster freeze mode + participantDetails.put("hasPendingStateTransition", hasPendingST); + } + + details.put(ClusterProperties.liveInstances.name(), participantDetails); + return details; + } + + @ResponseMetered(name = HttpConstants.WRITE_REQUEST) + @Timed(name = HttpConstants.WRITE_REQUEST) + @POST + @Path("{clusterId}/management-mode") + public Response updateClusterManagementMode(@PathParam("clusterId") String clusterId, + @DefaultValue("{}") String content) { + ClusterManagementModeRequest request; + try { + request = OBJECT_MAPPER.readerFor(ClusterManagementModeRequest.class).readValue(content); + } catch (JsonProcessingException e) { + LOG.warn("Failed to parse json string: {}", content, e); + return badRequest("Invalid payload json body: " + content); + } + + // Need to add cluster name + request = ClusterManagementModeRequest.newBuilder() + .withClusterName(clusterId) + .withMode(request.getMode()) + .withCancelPendingST(request.isCancelPendingST()) + .withReason(request.getReason()) + .build(); + + try { + getHelixAdmin().setClusterManagementMode(request); + } catch (HelixConflictException e) { + return Response.status(Response.Status.CONFLICT).entity(e.getMessage()).build(); + } catch (HelixException e) { + return serverError(e.getMessage()); + } + + return JSONRepresentation(ImmutableMap.of("acknowledged", true)); + } + + @ResponseMetered(name = HttpConstants.READ_REQUEST) + @Timed(name = HttpConstants.READ_REQUEST) + @GET @Path("{clusterId}/configs") public Response getClusterConfig(@PathParam("clusterId") String clusterId) { ConfigAccessor accessor = getConfigAccessor(); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index cca5284..8d2de70 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -36,16 +36,20 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.sun.research.ws.wadl.HTTPMethods; +import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; +import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.cloud.azure.AzureConstants; import org.apache.helix.cloud.constants.CloudProvider; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKUtil; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CustomizedStateConfig; @@ -54,6 +58,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.MaintenanceSignal; +import org.apache.helix.model.PauseSignal; import org.apache.helix.model.RESTConfig; import org.apache.helix.rest.common.HelixRestNamespace; import org.apache.helix.rest.server.auditlog.AuditLog; @@ -1285,6 +1290,71 @@ public class TestClusterAccessor extends AbstractTestClass { System.out.println("End test :" + TestHelper.getTestMethodName()); } + @Test + public void testClusterFreezeMode() throws Exception { + String cluster = _clusters.iterator().next(); + HelixDataAccessor dataAccessor = + new ZKHelixDataAccessor(cluster, new ZkBaseDataAccessor<>(_gZkClient)); + // Pause not existed + Assert.assertNull(dataAccessor.getProperty(dataAccessor.keyBuilder().pause())); + + String endpoint = "clusters/" + cluster + "/management-mode"; + + // Set cluster pause mode + ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder() + .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE) + .withClusterName(cluster) + .build(); + String payload = OBJECT_MAPPER.writeValueAsString(request); + post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + + PauseSignal pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause()); + Assert.assertNotNull(pauseSignal); + Assert.assertTrue(pauseSignal.isClusterPause()); + Assert.assertFalse(pauseSignal.getCancelPendingST()); + + // Wait until cluster status is persisted + TestHelper.verify(() -> dataAccessor.getBaseDataAccessor() + .exists(dataAccessor.keyBuilder().clusterStatus().getPath(), AccessOption.PERSISTENT), + TestHelper.WAIT_DURATION); + + // Verify get cluster status + String body = get(endpoint, null, Response.Status.OK.getStatusCode(), true); + ClusterManagementMode mode = + OBJECT_MAPPER.readerFor(ClusterManagementMode.class).readValue(body); + Assert.assertEquals(mode.getMode(), ClusterManagementMode.Type.CLUSTER_PAUSE); + // Depending on timing, it could IN_PROGRESS or COMPLETED. + // It's just to verify the rest response format is correct + Assert.assertTrue(ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus()) + || ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())); + + body = get(endpoint, ImmutableMap.of("showDetails", "true"), Response.Status.OK.getStatusCode(), + true); + Map<String, Object> responseMap = OBJECT_MAPPER.readerFor(Map.class).readValue(body); + Map<String, Object> detailsMap = (Map<String, Object>) responseMap.get("details"); + + Assert.assertEquals(responseMap.get("cluster"), cluster); + Assert.assertEquals(responseMap.get("mode"), mode.getMode().name()); + Assert.assertEquals(responseMap.get("status"), mode.getStatus().name()); + Assert.assertTrue(responseMap.containsKey("details")); + Assert.assertTrue(detailsMap.containsKey("cluster")); + Assert.assertTrue(detailsMap.containsKey("liveInstances")); + + // set normal mode + request = ClusterManagementModeRequest.newBuilder() + .withMode(ClusterManagementMode.Type.NORMAL) + .withClusterName(cluster) + .build(); + payload = OBJECT_MAPPER.writeValueAsString(request); + post(endpoint, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE), + Response.Status.OK.getStatusCode()); + + // Pause signal is deleted + pauseSignal = dataAccessor.getProperty(dataAccessor.keyBuilder().pause()); + Assert.assertNull(pauseSignal); + } + private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException { String body = get("clusters/" + cluster + "/configs", null, Response.Status.OK.getStatusCode(), true);
