Repository: helix Updated Branches: refs/heads/helix-0.6.x 9a5dbeaa6 -> e89174053
[HELIX-559] Fix Helix web admin performance issues, rb=28974 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e8917405 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e8917405 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e8917405 Branch: refs/heads/helix-0.6.x Commit: e89174053fa9c447dacd99af3fcba89c82467b60 Parents: 9a5dbea Author: zzhang <[email protected]> Authored: Thu Dec 11 16:52:32 2014 -0800 Committer: zzhang <[email protected]> Committed: Thu Dec 11 16:52:32 2014 -0800 ---------------------------------------------------------------------- helix-admin-webapp/pom.xml | 8 + .../apache/helix/webapp/HelixAdminWebApp.java | 12 ++ .../resources/ClusterRepresentationUtil.java | 2 +- .../webapp/resources/ExternalViewResource.java | 21 +- .../webapp/resources/IdealStateResource.java | 14 +- .../webapp/resources/InstanceResource.java | 65 +++---- .../webapp/resources/ResourceGroupResource.java | 51 ++--- .../resources/ResourceGroupsResource.java | 32 ++-- .../helix/webapp/resources/ResourceUtil.java | 84 +++++++- .../webapp/resources/TestJobQueuesResource.java | 190 +++++++++++++++++++ .../webapp/resources/TestJsonParameters.java | 44 +++++ .../java/resources/TestJobQueuesResource.java | 190 ------------------- .../test/java/resources/TestJsonParameters.java | 44 ----- .../org/apache/helix/manager/zk/ZKUtil.java | 13 +- pom.xml | 12 +- 15 files changed, 441 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/pom.xml ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/pom.xml b/helix-admin-webapp/pom.xml index 4e312d1..f4cccc4 100644 --- a/helix-admin-webapp/pom.xml +++ b/helix-admin-webapp/pom.xml @@ -51,6 +51,14 @@ under the License. <artifactId>org.restlet</artifactId> </dependency> <dependency> + <groupId>org.restlet.jse</groupId> + <artifactId>org.restlet.ext.jetty</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.3.1</version> http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java index 991886c..b61d7d5 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java @@ -19,8 +19,10 @@ package org.apache.helix.webapp; * under the License. */ +import org.apache.helix.manager.zk.ByteArraySerializer; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.webapp.resources.ResourceUtil; import org.apache.log4j.Logger; import org.restlet.Component; import org.restlet.Context; @@ -34,6 +36,7 @@ public class HelixAdminWebApp { private final int _helixAdminPort; private final String _zkServerAddress; private ZkClient _zkClient = null; + private ZkClient _rawZkClient = null; public HelixAdminWebApp(String zkServerAddress, int adminPort) { _zkServerAddress = zkServerAddress; @@ -46,6 +49,10 @@ public class HelixAdminWebApp { _zkClient = new ZkClient(_zkServerAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); + _rawZkClient = + new ZkClient(_zkServerAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer()); + _component = new Component(); _component.getServers().add(Protocol.HTTP, _helixAdminPort); Context applicationContext = _component.getContext().createChildContext(); @@ -53,6 +60,8 @@ public class HelixAdminWebApp { .put(RestAdminApplication.ZKSERVERADDRESS, _zkServerAddress); applicationContext.getAttributes().put(RestAdminApplication.PORT, "" + _helixAdminPort); applicationContext.getAttributes().put(RestAdminApplication.ZKCLIENT, _zkClient); + applicationContext.getAttributes().put(ResourceUtil.ContextKey.RAW_ZKCLIENT.toString(), + _rawZkClient); _adminApp = new RestAdminApplication(applicationContext); // Attach the application to the component and start it _component.getDefaultHost().attach(_adminApp); @@ -72,6 +81,9 @@ public class HelixAdminWebApp { if (_zkClient != null) { _zkClient.close(); } + if (_rawZkClient != null) { + _rawZkClient.close(); + } } } } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java index 5e458c4..35b1f7a 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java @@ -50,7 +50,7 @@ import org.restlet.data.Form; import org.restlet.data.MediaType; public class ClusterRepresentationUtil { - private static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD"); + public static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD"); public static String getClusterPropertyAsString(ZkClient zkClient, String clusterName, PropertyKey propertyKey, MediaType mediaType) http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ExternalViewResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ExternalViewResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ExternalViewResource.java index 6ec28dc..00622d9 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ExternalViewResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ExternalViewResource.java @@ -24,7 +24,6 @@ import java.io.IOException; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.webapp.RestAdminApplication; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -47,8 +46,11 @@ public class ExternalViewResource extends ServerResource { public Representation get() { StringRepresentation presentation = null; try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String resourceName = (String) getRequest().getAttributes().get("resourceName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String resourceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.RESOURCE_NAME); + presentation = getExternalViewRepresentation(clusterName, resourceName); } @@ -56,7 +58,7 @@ public class ExternalViewResource extends ServerResource { String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); - LOG.error("", e); + LOG.error("Exception in get externalView", e); } return presentation; } @@ -64,14 +66,13 @@ public class ExternalViewResource extends ServerResource { StringRepresentation getExternalViewRepresentation(String clusterName, String resourceName) throws JsonGenerationException, JsonMappingException, IOException { Builder keyBuilder = new PropertyKey.Builder(clusterName); - ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ; + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.RAW_ZKCLIENT); - String message = - ClusterRepresentationUtil.getClusterPropertyAsString(zkClient, clusterName, - keyBuilder.externalView(resourceName), MediaType.APPLICATION_JSON); + String extViewStr = + ResourceUtil.readZkAsBytes(zkclient, keyBuilder.externalView(resourceName)); StringRepresentation representation = - new StringRepresentation(message, MediaType.APPLICATION_JSON); + new StringRepresentation(extViewStr, MediaType.APPLICATION_JSON); return representation; } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java index 0081922..6ff539e 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/IdealStateResource.java @@ -77,7 +77,7 @@ public class IdealStateResource extends ServerResource { String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); - LOG.error("", e); + LOG.error("Exception in get idealState", e); } return presentation; } @@ -85,15 +85,13 @@ public class IdealStateResource extends ServerResource { StringRepresentation getIdealStateRepresentation(String clusterName, String resourceName) throws JsonGenerationException, JsonMappingException, IOException { Builder keyBuilder = new PropertyKey.Builder(clusterName); - ZkClient zkClient = - ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); - - String message = - ClusterRepresentationUtil.getClusterPropertyAsString(zkClient, clusterName, - keyBuilder.idealStates(resourceName), MediaType.APPLICATION_JSON); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.RAW_ZKCLIENT); + String idealStateStr = + ResourceUtil.readZkAsBytes(zkclient, keyBuilder.idealStates(resourceName)); StringRepresentation representation = - new StringRepresentation(message, MediaType.APPLICATION_JSON); + new StringRepresentation(idealStateStr, MediaType.APPLICATION_JSON); return representation; } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java index e62fe5e..9e0d8b5 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/InstanceResource.java @@ -27,7 +27,6 @@ import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.webapp.RestAdminApplication; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -56,24 +55,25 @@ public class InstanceResource extends ServerResource { String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); - LOG.error("", e); + LOG.error("Exception in get instance", e); } return presentation; } StringRepresentation getInstanceRepresentation() throws JsonGenerationException, JsonMappingException, IOException { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String instanceName = (String) getRequest().getAttributes().get("instanceName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String instanceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.INSTANCE_NAME); Builder keyBuilder = new PropertyKey.Builder(clusterName); - ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - - String message = - ClusterRepresentationUtil.getClusterPropertyAsString(zkClient, clusterName, - MediaType.APPLICATION_JSON, keyBuilder.instanceConfig(instanceName)); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.RAW_ZKCLIENT); + String instanceCfgStr = + ResourceUtil.readZkAsBytes(zkclient, keyBuilder.instanceConfig(instanceName)); StringRepresentation representation = - new StringRepresentation(message, MediaType.APPLICATION_JSON); + new StringRepresentation(instanceCfgStr, MediaType.APPLICATION_JSON); return representation; } @@ -81,8 +81,14 @@ public class InstanceResource extends ServerResource { @Override public Representation post(Representation entity) { try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String instanceName = (String) getRequest().getAttributes().get("instanceName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String instanceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.INSTANCE_NAME); + + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + ClusterSetup setupTool = new ClusterSetup(zkclient); JsonParameters jsonParameters = new JsonParameters(entity); String command = jsonParameters.getCommand(); @@ -91,9 +97,6 @@ public class InstanceResource extends ServerResource { boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED)); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled); } else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) { jsonParameters.verifyCommand(ClusterSetup.enablePartition); @@ -103,9 +106,6 @@ public class InstanceResource extends ServerResource { String[] partitions = jsonParameters.getParameter(JsonParameters.PARTITION).split(";"); String resource = jsonParameters.getParameter(JsonParameters.RESOURCE); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool().enablePartition(enabled, clusterName, instanceName, resource, Arrays.asList(partitions)); } else if (command.equalsIgnoreCase(ClusterSetup.resetPartition)) { @@ -113,9 +113,6 @@ public class InstanceResource extends ServerResource { String resource = jsonParameters.getParameter(JsonParameters.RESOURCE); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); String[] partitionNames = jsonParameters.getParameter(JsonParameters.PARTITION).split("\\s+"); setupTool.getClusterManagementTool().resetPartition(clusterName, instanceName, resource, @@ -123,24 +120,15 @@ public class InstanceResource extends ServerResource { } else if (command.equalsIgnoreCase(ClusterSetup.resetInstance)) { jsonParameters.verifyCommand(ClusterSetup.resetInstance); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool() .resetInstance(clusterName, Arrays.asList(instanceName)); } else if (command.equalsIgnoreCase(ClusterSetup.addInstanceTag)) { jsonParameters.verifyCommand(ClusterSetup.addInstanceTag); String tag = jsonParameters.getParameter(ClusterSetup.instanceGroupTag); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool().addInstanceTag(clusterName, instanceName, tag); } else if (command.equalsIgnoreCase(ClusterSetup.removeInstanceTag)) { jsonParameters.verifyCommand(ClusterSetup.removeInstanceTag); String tag = jsonParameters.getParameter(ClusterSetup.instanceGroupTag); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool().removeInstanceTag(clusterName, instanceName, tag); } else { throw new HelixException("Unsupported command: " + command + ". Should be one of [" @@ -154,7 +142,7 @@ public class InstanceResource extends ServerResource { getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e), MediaType.APPLICATION_JSON); getResponse().setStatus(Status.SUCCESS_OK); - LOG.error("", e); + LOG.error("Exception in post instance", e); } return null; } @@ -162,19 +150,20 @@ public class InstanceResource extends ServerResource { @Override public Representation delete() { try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String instanceName = (String) getRequest().getAttributes().get("instanceName"); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - - ClusterSetup setupTool = new ClusterSetup(zkClient); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String instanceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.INSTANCE_NAME); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + ClusterSetup setupTool = new ClusterSetup(zkclient); setupTool.dropInstanceFromCluster(clusterName, instanceName); getResponse().setStatus(Status.SUCCESS_OK); } catch (Exception e) { getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e), MediaType.APPLICATION_JSON); getResponse().setStatus(Status.SUCCESS_OK); - LOG.error("Error in remove", e); + LOG.error("Error in delete instance", e); } return null; } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java index 6dc721d..b08f0b1 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java @@ -27,7 +27,6 @@ import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.webapp.RestAdminApplication; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -51,8 +50,10 @@ public class ResourceGroupResource extends ServerResource { public Representation get() { StringRepresentation presentation = null; try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String resourceName = (String) getRequest().getAttributes().get("resourceName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String resourceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.RESOURCE_NAME); presentation = getIdealStateRepresentation(clusterName, resourceName); } @@ -60,7 +61,7 @@ public class ResourceGroupResource extends ServerResource { String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); - LOG.error("", e); + LOG.error("Exception in get resource group", e); } return presentation; } @@ -68,14 +69,14 @@ public class ResourceGroupResource extends ServerResource { StringRepresentation getIdealStateRepresentation(String clusterName, String resourceName) throws JsonGenerationException, JsonMappingException, IOException { Builder keyBuilder = new PropertyKey.Builder(clusterName); - ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - String message = - ClusterRepresentationUtil.getClusterPropertyAsString(zkClient, clusterName, - keyBuilder.idealStates(resourceName), MediaType.APPLICATION_JSON); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.RAW_ZKCLIENT); + String idealStateStr = + ResourceUtil.readZkAsBytes(zkclient, keyBuilder.idealStates(resourceName)); StringRepresentation representation = - new StringRepresentation(message, MediaType.APPLICATION_JSON); + new StringRepresentation(idealStateStr, MediaType.APPLICATION_JSON); return representation; } @@ -83,13 +84,15 @@ public class ResourceGroupResource extends ServerResource { @Override public Representation delete() { try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String resourceGroupName = (String) getRequest().getAttributes().get("resourceName"); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - - ClusterSetup setupTool = new ClusterSetup(zkClient); - setupTool.dropResourceFromCluster(clusterName, resourceGroupName); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String resourceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.RESOURCE_NAME); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + + ClusterSetup setupTool = new ClusterSetup(zkclient); + setupTool.dropResourceFromCluster(clusterName, resourceName); getResponse().setStatus(Status.SUCCESS_OK); } catch (Exception e) { getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e), @@ -103,23 +106,23 @@ public class ResourceGroupResource extends ServerResource { @Override public Representation post(Representation entity) { try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); - String resourceName = (String) getRequest().getAttributes().get("resourceName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String resourceName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.RESOURCE_NAME); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + ClusterSetup setupTool = new ClusterSetup(zkclient); JsonParameters jsonParameters = new JsonParameters(entity); String command = jsonParameters.getCommand(); if (command.equalsIgnoreCase(ClusterSetup.resetResource)) { - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); + setupTool.getClusterManagementTool() .resetResource(clusterName, Arrays.asList(resourceName)); } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) { jsonParameters.verifyCommand(ClusterSetup.enableResource); boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED)); - ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled); } else { throw new HelixException("Unsupported command: " + command + ". Should be one of [" http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java index ad4e934..a43d2ff 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupsResource.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; @@ -31,7 +30,6 @@ import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.webapp.RestAdminApplication; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -58,7 +56,8 @@ public class ResourceGroupsResource extends ServerResource { public Representation get() { StringRepresentation presentation = null; try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); presentation = getHostedEntitiesRepresentation(clusterName); } @@ -66,7 +65,7 @@ public class ResourceGroupsResource extends ServerResource { String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); - LOG.error("", e); + LOG.error("Exception in get resourceGroups", e); } return presentation; } @@ -74,21 +73,24 @@ public class ResourceGroupsResource extends ServerResource { StringRepresentation getHostedEntitiesRepresentation(String clusterName) throws JsonGenerationException, JsonMappingException, IOException { // Get all resources - ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - HelixDataAccessor accessor = - ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Map<String, IdealState> idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates()); + ZkClient zkclient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.RAW_ZKCLIENT); + PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); + Map<String, String> idealStateMap = + ResourceUtil.readZkChildrenAsBytesMap(zkclient, keyBuilder.idealStates()); // Create the result ZNRecord hostedEntitiesRecord = new ZNRecord("ResourceGroups"); // Figure out which tags are present on which resources Map<String, String> tagMap = Maps.newHashMap(); - for (IdealState idealState : idealStateMap.values()) { - String tag = idealState.getInstanceGroupTag(); + for (String resourceName : idealStateMap.keySet()) { + String idealStateStr = idealStateMap.get(resourceName); + String tag = + ResourceUtil.extractSimpleFieldFromZNRecord(idealStateStr, + IdealState.IdealStateProperty.INSTANCE_GROUP_TAG.toString()); if (tag != null) { - tagMap.put(idealState.getId(), tag); + tagMap.put(resourceName, tag); } } @@ -109,7 +111,8 @@ public class ResourceGroupsResource extends ServerResource { @Override public Representation post(Representation entity) { try { - String clusterName = (String) getRequest().getAttributes().get("clusterName"); + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); JsonParameters jsonParameters = new JsonParameters(entity); String command = jsonParameters.getCommand(); @@ -148,8 +151,7 @@ public class ResourceGroupsResource extends ServerResource { } ZkClient zkClient = - (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT); - ; + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); ClusterSetup setupTool = new ClusterSetup(zkClient); setupTool.addResourceToCluster(clusterName, entityName, partitions, stateModelDefRef, mode, bucketSize, maxPartitionsPerNode); http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java index f066dfc..f47d6db 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java @@ -19,14 +19,30 @@ package org.apache.helix.webapp.resources; * under the License. */ +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.webapp.RestAdminApplication; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; import org.restlet.Context; import org.restlet.Request; import org.restlet.data.Form; -import org.restlet.representation.Representation; public class ResourceUtil { + private static final String EMPTY_ZNRECORD_STRING = + objectToJson(ClusterRepresentationUtil.EMPTY_ZNRECORD); + /** * Key enums for getting values from request */ @@ -36,7 +52,8 @@ public class ResourceUtil { JOB("job"), CONSTRAINT_TYPE("constraintType"), CONSTRAINT_ID("constraintId"), - RESOURCE_NAME("resourceName"); + RESOURCE_NAME("resourceName"), + INSTANCE_NAME("instanceName"); private final String _key; @@ -54,7 +71,8 @@ public class ResourceUtil { */ public enum ContextKey { ZK_ADDR(RestAdminApplication.ZKSERVERADDRESS), - ZKCLIENT(RestAdminApplication.ZKCLIENT); + ZKCLIENT(RestAdminApplication.ZKCLIENT), + RAW_ZKCLIENT("rawZkClient"); // zkclient that uses raw-byte serializer private final String _key; @@ -74,6 +92,7 @@ public class ResourceUtil { NEW_JOB("newJob"); private final String _key; + YamlParamKey(String key) { _key = key; } @@ -83,11 +102,29 @@ public class ResourceUtil { } } + private static String objectToJson(Object object) { + ObjectMapper mapper = new ObjectMapper(); + SerializationConfig serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + + StringWriter sw = new StringWriter(); + try { + mapper.writeValue(sw, object); + } catch (JsonGenerationException e) { + // Should not be here + } catch (JsonMappingException e) { + // Should not be here + } catch (IOException e) { + // Should not be here + } + + return sw.toString(); + } + public static String getAttributeFromRequest(Request r, RequestKey key) { return (String) r.getAttributes().get(key.toString()); } - public static ZkClient getAttributeFromCtx(Context ctx, ContextKey key) { return (ZkClient) ctx.getAttributes().get(key.toString()); } @@ -95,4 +132,43 @@ public class ResourceUtil { public static String getYamlParameters(Form form, YamlParamKey key) { return form.getFirstValue(key.toString()); } + + public static String readZkAsBytes(ZkClient zkclient, PropertyKey propertyKey) { + byte[] bytes = zkclient.readData(propertyKey.getPath()); + return bytes == null ? EMPTY_ZNRECORD_STRING : new String(bytes); + } + + static String extractSimpleFieldFromZNRecord(String recordStr, String key) { + int idx = recordStr.indexOf(key); + if (idx != -1) { + idx = recordStr.indexOf('"', idx + key.length() + 1); + if (idx != -1) { + int idx2 = recordStr.indexOf('"', idx + 1); + if (idx2 != -1) { + return recordStr.substring(idx + 1, idx2); + } + } + + } + return null; + } + + public static Map<String, String> readZkChildrenAsBytesMap(ZkClient zkclient, PropertyKey propertyKey) { + BaseDataAccessor<byte[]> baseAccessor = new ZkBaseDataAccessor<byte[]>(zkclient); + String parentPath = propertyKey.getPath(); + List<String> childNames = baseAccessor.getChildNames(parentPath, 0); + if (childNames == null) { + return null; + } + List<String> paths = new ArrayList<String>(); + for (String childName : childNames) { + paths.add(parentPath + "/" + childName); + } + List<byte[]> values = baseAccessor.get(paths, null, 0); + Map<String, String> ret = new HashMap<String, String>(); + for (int i = 0; i < childNames.size(); i++) { + ret.put(childNames.get(i), new String(values.get(i))); + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java new file mode 100644 index 0000000..cc922ad --- /dev/null +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java @@ -0,0 +1,190 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.Lists; + +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.DummyTask; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.beans.JobBean; +import org.apache.helix.task.beans.WorkflowBean; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.webapp.AdminTestBase; +import org.apache.helix.webapp.AdminTestHelper; +import org.apache.helix.webapp.resources.ClusterRepresentationUtil; +import org.apache.helix.webapp.resources.JsonParameters; +import org.apache.helix.webapp.resources.ResourceUtil; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.yaml.snakeyaml.Yaml; + +public class TestJobQueuesResource extends AdminTestBase { + private static final Logger LOG = Logger.getLogger(TestJobQueuesResource.class); + + @Test + public void test() throws Exception { + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 5; + final int p = 20; + final int r = 3; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(clusterName, true); + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + _gSetupTool.addInstanceToCluster(clusterName, instanceName); + } + + // Set up target db + _gSetupTool.addResourceToCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, p, + "MasterSlave"); + _gSetupTool.rebalanceStorageCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, r); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("DummyTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new DummyTask(context); + } + }); + + // Start dummy participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(participants[i], + taskFactoryReg)); + participants[i].syncStart(); + } + + // start controller + String controllerName = "controller"; + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); + controller.syncStart(); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Start a queue + String queueName = "myQueue1"; + LOG.info("Starting job-queue: " + queueName); + String jobQueueYamlConfig = "name: " + queueName; + + String resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues"; + ZNRecord postRet = AdminTestHelper.post(_gClient, resourceUrl, jobQueueYamlConfig); + LOG.info("Started job-queue: " + queueName + ", ret: " + postRet); + + LOG.info("Getting all job-queues"); + ZNRecord getRet = AdminTestHelper.get(_gClient, resourceUrl); + LOG.info("Got job-queues: " + getRet); + + // Enqueue job + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; + + WorkflowBean wfBean = new WorkflowBean(); + wfBean.name = queueName; + JobBean jBean = new JobBean(); + jBean.name = "myJob1"; + jBean.command = "DummyTask"; + jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; + jBean.targetPartitionStates = Lists.newArrayList("MASTER"); + wfBean.jobs = Lists.newArrayList(jBean); + String jobYamlConfig = new Yaml().dump(wfBean); + LOG.info("Enqueuing a job: " + jobQueueYamlConfig); + + Map<String, String> paraMap = new HashMap<String, String>(); + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString()); + + String postBody = + String.format("%s=%s&%s=%s", JsonParameters.JSON_PARAMETERS, + ClusterRepresentationUtil.ObjectToJson(paraMap), ResourceUtil.YamlParamKey.NEW_JOB.toString(), + jobYamlConfig); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Enqueued job, ret: " + postRet); + + // Get job + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName + + "/" + jBean.name; + getRet = AdminTestHelper.get(_gClient, resourceUrl); + LOG.info("Got job: " + getRet); + + // Stop job queue + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.stop.toString()); + postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Stopped job-queue, ret: " + postRet); + + // Resume job queue + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString()); + postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Resumed job-queue, ret: " + postRet); + + // Flush job queue + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush"); + postBody = + JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Flushed job-queue, ret: " + postRet); + + // clean up + controller.syncStop(); + for (int i = 0; i < n; i++) { + if (participants[i] != null && participants[i].isConnected()) { + participants[i].syncStop(); + } + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJsonParameters.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJsonParameters.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJsonParameters.java new file mode 100644 index 0000000..2bf484d --- /dev/null +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJsonParameters.java @@ -0,0 +1,44 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; + +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.webapp.resources.ClusterRepresentationUtil; +import org.apache.helix.webapp.resources.JsonParameters; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestJsonParameters { + @Test + public void test() throws Exception { + String jsonPayload = + "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}"; + Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload); + Assert.assertNotNull(map.get(JsonParameters.MANAGEMENT_COMMAND)); + Assert.assertEquals(ClusterSetup.resetPartition, map.get(JsonParameters.MANAGEMENT_COMMAND)); + Assert.assertNotNull(map.get(JsonParameters.RESOURCE)); + Assert.assertEquals("DB-1", map.get(JsonParameters.RESOURCE)); + Assert.assertNotNull(map.get(JsonParameters.PARTITION)); + Assert.assertEquals("DB-1_22 DB-1_23", map.get(JsonParameters.PARTITION)); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java deleted file mode 100644 index 6c0e0e1..0000000 --- a/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java +++ /dev/null @@ -1,190 +0,0 @@ -package resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import com.google.common.collect.Lists; - -import org.apache.helix.TestHelper; -import org.apache.helix.ZNRecord; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.integration.task.DummyTask; -import org.apache.helix.integration.task.WorkflowGenerator; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.helix.task.Task; -import org.apache.helix.task.TaskCallbackContext; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskFactory; -import org.apache.helix.task.TaskStateModelFactory; -import org.apache.helix.task.beans.JobBean; -import org.apache.helix.task.beans.WorkflowBean; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.webapp.AdminTestBase; -import org.apache.helix.webapp.AdminTestHelper; -import org.apache.helix.webapp.resources.ClusterRepresentationUtil; -import org.apache.helix.webapp.resources.JsonParameters; -import org.apache.helix.webapp.resources.ResourceUtil; -import org.apache.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.Test; - -import org.yaml.snakeyaml.Yaml; - -public class TestJobQueuesResource extends AdminTestBase { - private static final Logger LOG = Logger.getLogger(TestJobQueuesResource.class); - - @Test - public void test() throws Exception { - - String className = TestHelper.getTestClassName(); - String methodName = TestHelper.getTestMethodName(); - String clusterName = className + "_" + methodName; - final int n = 5; - final int p = 20; - final int r = 3; - - System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - - _gSetupTool.addCluster(clusterName, true); - for (int i = 0; i < n; i++) { - String instanceName = "localhost_" + (12918 + i); - _gSetupTool.addInstanceToCluster(clusterName, instanceName); - } - - // Set up target db - _gSetupTool.addResourceToCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, p, - "MasterSlave"); - _gSetupTool.rebalanceStorageCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, r); - - Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); - taskFactoryReg.put("DummyTask", new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new DummyTask(context); - } - }); - - // Start dummy participants - MockParticipantManager[] participants = new MockParticipantManager[n]; - for (int i = 0; i < n; i++) { - String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); - - // Register a Task state model factory. - StateMachineEngine stateMachine = participants[i].getStateMachineEngine(); - stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(participants[i], - taskFactoryReg)); - participants[i].syncStart(); - } - - // start controller - String controllerName = "controller"; - ClusterControllerManager controller = - new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); - controller.syncStart(); - - boolean result = - ClusterStateVerifier - .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, - clusterName)); - Assert.assertTrue(result); - - // Start a queue - String queueName = "myQueue1"; - LOG.info("Starting job-queue: " + queueName); - String jobQueueYamlConfig = "name: " + queueName; - - String resourceUrl = - "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues"; - ZNRecord postRet = AdminTestHelper.post(_gClient, resourceUrl, jobQueueYamlConfig); - LOG.info("Started job-queue: " + queueName + ", ret: " + postRet); - - LOG.info("Getting all job-queues"); - ZNRecord getRet = AdminTestHelper.get(_gClient, resourceUrl); - LOG.info("Got job-queues: " + getRet); - - // Enqueue job - resourceUrl = - "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; - - WorkflowBean wfBean = new WorkflowBean(); - wfBean.name = queueName; - JobBean jBean = new JobBean(); - jBean.name = "myJob1"; - jBean.command = "DummyTask"; - jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; - jBean.targetPartitionStates = Lists.newArrayList("MASTER"); - wfBean.jobs = Lists.newArrayList(jBean); - String jobYamlConfig = new Yaml().dump(wfBean); - LOG.info("Enqueuing a job: " + jobQueueYamlConfig); - - Map<String, String> paraMap = new HashMap<String, String>(); - paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString()); - - String postBody = - String.format("%s=%s&%s=%s", JsonParameters.JSON_PARAMETERS, - ClusterRepresentationUtil.ObjectToJson(paraMap), ResourceUtil.YamlParamKey.NEW_JOB.toString(), - jobYamlConfig); - postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); - LOG.info("Enqueued job, ret: " + postRet); - - // Get job - resourceUrl = - "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName - + "/" + jBean.name; - getRet = AdminTestHelper.get(_gClient, resourceUrl); - LOG.info("Got job: " + getRet); - - // Stop job queue - resourceUrl = - "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; - paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.stop.toString()); - postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); - postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); - LOG.info("Stopped job-queue, ret: " + postRet); - - // Resume job queue - paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString()); - postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); - postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); - LOG.info("Resumed job-queue, ret: " + postRet); - - // Flush job queue - paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush"); - postBody = - JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap); - postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); - LOG.info("Flushed job-queue, ret: " + postRet); - - // clean up - controller.syncStop(); - for (int i = 0; i < n; i++) { - if (participants[i] != null && participants[i].isConnected()) { - participants[i].syncStop(); - } - } - - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java deleted file mode 100644 index 383ac21..0000000 --- a/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java +++ /dev/null @@ -1,44 +0,0 @@ -package resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; - -import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.webapp.resources.ClusterRepresentationUtil; -import org.apache.helix.webapp.resources.JsonParameters; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class TestJsonParameters { - @Test - public void test() throws Exception { - String jsonPayload = - "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}"; - Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload); - Assert.assertNotNull(map.get(JsonParameters.MANAGEMENT_COMMAND)); - Assert.assertEquals(ClusterSetup.resetPartition, map.get(JsonParameters.MANAGEMENT_COMMAND)); - Assert.assertNotNull(map.get(JsonParameters.RESOURCE)); - Assert.assertEquals("DB-1", map.get(JsonParameters.RESOURCE)); - Assert.assertNotNull(map.get(JsonParameters.PARTITION)); - Assert.assertEquals("DB-1_22 DB-1_23", map.get(JsonParameters.PARTITION)); - } - -} http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index c61dccd..d243a50 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.InstanceType; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; @@ -65,11 +66,13 @@ public final class ZKUtil { requiredPaths.add(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName)); boolean isValid = true; - for (String path : requiredPaths) { - if (!zkClient.exists(path)) { + BaseDataAccessor<Object> baseAccessor = new ZkBaseDataAccessor<Object>(zkClient); + boolean[] ret = baseAccessor.exists(requiredPaths, 0); + for (int i = 0; i < ret.length; i++) { + if (!ret[i]) { isValid = false; if (logger.isDebugEnabled()) { - logger.debug("Invalid cluster setup, missing znode path: " + path); + logger.debug("Invalid cluster setup, missing znode path: " + requiredPaths.get(i)); } } } @@ -285,9 +288,7 @@ public final class ZKUtil { } } catch (Exception e) { retryCount = retryCount + 1; - logger.warn("Exception trying to createOrReplace " + path + " Exception:" + e.getMessage() - + ". Will retry."); - e.printStackTrace(); + logger.warn("Exception trying to createOrReplace " + path + ". Will retry.", e); } } http://git-wip-us.apache.org/repos/asf/helix/blob/e8917405/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 92c1d7d..68ae2c0 100644 --- a/pom.xml +++ b/pom.xml @@ -255,7 +255,17 @@ under the License. <dependency> <groupId>org.restlet.jse</groupId> <artifactId>org.restlet</artifactId> - <version>2.2.3</version> + <version>2.2.1</version> + </dependency> + <dependency> + <groupId>org.restlet.jse</groupId> + <artifactId>org.restlet.ext.jetty</artifactId> + <version>2.2.1</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>8.1.8.v20121106</version> </dependency> <dependency> <groupId>org.apache.helix</groupId>
