Repository: helix Updated Branches: refs/heads/helix-0.6.x 059ab387b -> aa2e968f7
[HELIX-546] Add REST API for Helix job queue management - second part, rb=28584 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aa2e968f Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aa2e968f Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aa2e968f Branch: refs/heads/helix-0.6.x Commit: aa2e968f7ac4944ce60f9defdeee2fb9b8638cb9 Parents: 059ab38 Author: zzhang <[email protected]> Authored: Tue Dec 2 14:07:34 2014 -0800 Committer: zzhang <[email protected]> Committed: Tue Dec 2 14:07:34 2014 -0800 ---------------------------------------------------------------------- .../apache/helix/webapp/HelixAdminWebApp.java | 16 +- .../helix/webapp/RestAdminApplication.java | 6 + .../resources/ClusterRepresentationUtil.java | 4 +- .../webapp/resources/JobQueueResource.java | 173 +++++++++++++++++ .../webapp/resources/JobQueuesResource.java | 148 +++++++++++++++ .../helix/webapp/resources/JobResource.java | 107 +++++++++++ .../helix/webapp/resources/JsonParameters.java | 21 +- .../helix/webapp/resources/ResourceUtil.java | 95 ++++++++++ .../org/apache/helix/webapp/AdminTestBase.java | 7 +- .../apache/helix/webapp/AdminTestHelper.java | 57 ++++++ .../java/resources/TestJobQueuesResource.java | 190 +++++++++++++++++++ .../test/java/resources/TestJsonParameters.java | 44 +++++ .../apache/helix/manager/zk/ZKHelixManager.java | 2 +- .../java/org/apache/helix/task/TaskDriver.java | 9 +- .../java/org/apache/helix/task/Workflow.java | 2 +- pom.xml | 2 +- 16 files changed, 852 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java index 1988636..991886c 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java @@ -28,12 +28,12 @@ import org.restlet.data.Protocol; public class HelixAdminWebApp { public final Logger LOG = Logger.getLogger(HelixAdminWebApp.class); - RestAdminApplication _adminApp = null; - Component _component = null; + private RestAdminApplication _adminApp = null; + private Component _component = null; - int _helixAdminPort; - String _zkServerAddress; - ZkClient _zkClient; + private final int _helixAdminPort; + private final String _zkServerAddress; + private ZkClient _zkClient = null; public HelixAdminWebApp(String zkServerAddress, int adminPort) { _zkServerAddress = zkServerAddress; @@ -58,14 +58,16 @@ public class HelixAdminWebApp { _component.getDefaultHost().attach(_adminApp); _component.start(); } - LOG.info("helixAdminWebApp started on port " + _helixAdminPort); + LOG.info("helixAdminWebApp started on port: " + _helixAdminPort); } public synchronized void stop() { + LOG.info("Stopping helixAdminWebApp"); try { _component.stop(); + LOG.info("Stopped helixAdminWebApp"); } catch (Exception e) { - LOG.error("", e); + LOG.error("Exception in stopping helixAdminWebApp", e); } finally { if (_zkClient != null) { _zkClient.close(); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java index 7e7a3b9..9842a3d 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java @@ -43,6 +43,9 @@ import org.apache.helix.webapp.resources.ExternalViewResource; import org.apache.helix.webapp.resources.IdealStateResource; import org.apache.helix.webapp.resources.InstanceResource; import org.apache.helix.webapp.resources.InstancesResource; +import org.apache.helix.webapp.resources.JobQueuesResource; +import org.apache.helix.webapp.resources.JobQueueResource; +import org.apache.helix.webapp.resources.JobResource; import org.apache.helix.webapp.resources.ResourceGroupResource; import org.apache.helix.webapp.resources.ResourceGroupsResource; import org.apache.helix.webapp.resources.SchedulerTasksResource; @@ -92,6 +95,9 @@ public class RestAdminApplication extends Application { router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}", ResourceGroupResource.class); router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class); + router.attach("/clusters/{clusterName}/jobQueues", JobQueuesResource.class); + router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}", JobQueueResource.class); + router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}/{job}", JobResource.class); router.attach("/clusters/{clusterName}/instances", InstancesResource.class); router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class); router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}", http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java index f227801..5e458c4 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java @@ -53,9 +53,7 @@ public class ClusterRepresentationUtil { private static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD"); public static String getClusterPropertyAsString(ZkClient zkClient, String clusterName, - PropertyKey propertyKey, - // String key, - MediaType mediaType) + PropertyKey propertyKey, MediaType mediaType) throws JsonGenerationException, JsonMappingException, IOException { return getClusterPropertyAsString(zkClient, clusterName, mediaType, propertyKey); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java new file mode 100644 index 0000000..3ff9a37 --- /dev/null +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java @@ -0,0 +1,173 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathConfig; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowContext; +import org.apache.log4j.Logger; +import org.restlet.data.Form; +import org.restlet.data.MediaType; +import org.restlet.data.Status; +import org.restlet.representation.Representation; +import org.restlet.representation.StringRepresentation; +import org.restlet.representation.Variant; +import org.restlet.resource.ServerResource; + +import java.util.Map; + +public class JobQueueResource extends ServerResource { + private final static Logger LOG = Logger.getLogger(JobQueueResource.class); + + public JobQueueResource() { + getVariants().add(new Variant(MediaType.TEXT_PLAIN)); + getVariants().add(new Variant(MediaType.APPLICATION_JSON)); + setNegotiated(false); + } + + @Override + public Representation get() { + StringRepresentation presentation; + try { + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String jobQueueName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE); + presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName); + } catch (Exception e) { + String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); + presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); + LOG.error("Fail to get job queue", e); + } + return presentation; + } + + StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName) + throws Exception { + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + HelixDataAccessor accessor = + ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // Get job queue config + HelixProperty jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName)); + + // Get job queue context + String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName); + HelixPropertyStore<ZNRecord> propertyStore = + new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null); + WorkflowContext ctx = TaskUtil.getWorkflowContext(propertyStore, jobQueueName); + + // Create the result + ZNRecord hostedEntitiesRecord = new ZNRecord(jobQueueName); + if (jobQueueConfig != null) { + hostedEntitiesRecord.merge(jobQueueConfig.getRecord()); + } + if (ctx != null) { + hostedEntitiesRecord.merge(ctx.getRecord()); + } + + StringRepresentation representation = + new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord), + MediaType.APPLICATION_JSON); + + return representation; + } + + @Override + public Representation post(Representation entity) { + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String jobQueueName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE); + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + try { + TaskDriver driver = new TaskDriver(zkClient, clusterName); + + Form form = new Form(entity); + JsonParameters jsonParameters = new JsonParameters(form); + + TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(jsonParameters.getCommand()); + switch (cmd) { + case start: { + // Get the job queue and submit it + String yamlPayload = ResourceUtil.getYamlParameters(form, ResourceUtil.YamlParamKey.NEW_JOB); + if (yamlPayload == null) { + throw new HelixException("Yaml job config is required!"); + } + Workflow workflow = Workflow.parse(yamlPayload); + + for (String jobName : workflow.getJobConfigs().keySet()) { + Map<String, String> jobCfgMap = workflow.getJobConfigs().get(jobName); + JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobCfgMap); + if (workflow.getTaskConfigs() != null && workflow.getTaskConfigs().containsKey(jobName)) { + jobCfgBuilder.addTaskConfigs(workflow.getTaskConfigs().get(jobName)); + } + driver.enqueueJob(jobQueueName, TaskUtil.getDenamespacedJobName(jobQueueName, jobName), + jobCfgBuilder); + } + break; + } + case stop: { + driver.stop(jobQueueName); + break; + } + case resume: { + driver.resume(jobQueueName); + break; + } + case flush: { + driver.flushQueue(jobQueueName); + break; + } + case delete: { + driver.delete(jobQueueName); + break; + } + default: + throw new HelixException("Unsupported job queue command: " + cmd); + } + getResponse().setEntity(getHostedEntitiesRepresentation(clusterName, jobQueueName)); + getResponse().setStatus(Status.SUCCESS_OK); + + } catch (Exception e) { + getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e), + MediaType.APPLICATION_JSON); + getResponse().setStatus(Status.SUCCESS_OK); + LOG.error("Error in posting job queue: " + entity, e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java new file mode 100644 index 0000000..24a4387 --- /dev/null +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java @@ -0,0 +1,148 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Lists; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowConfig; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.restlet.data.Form; +import org.restlet.data.MediaType; +import org.restlet.data.Parameter; +import org.restlet.data.Status; +import org.restlet.representation.Representation; +import org.restlet.representation.StringRepresentation; +import org.restlet.representation.Variant; +import org.restlet.resource.ServerResource; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class JobQueuesResource extends ServerResource { + private final static Logger LOG = Logger.getLogger(JobQueuesResource.class); + + public JobQueuesResource() { + getVariants().add(new Variant(MediaType.TEXT_PLAIN)); + getVariants().add(new Variant(MediaType.APPLICATION_JSON)); + setNegotiated(false); + } + + @Override + public Representation get() { + StringRepresentation presentation = null; + try { + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + presentation = getHostedEntitiesRepresentation(clusterName); + } catch (Exception e) { + String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); + presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); + + LOG.error("Fail to get all job queues", e); + } + return presentation; + } + + StringRepresentation getHostedEntitiesRepresentation(String clusterName) + throws JsonGenerationException, JsonMappingException, IOException { + // Get all resources + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + HelixDataAccessor accessor = + ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Map<String, HelixProperty> resourceConfigMap = + accessor.getChildValuesMap(keyBuilder.resourceConfigs()); + + // Create the result + ZNRecord hostedEntitiesRecord = new ZNRecord("JobQueues"); + + // Filter out non-workflow resources + Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, HelixProperty> e = it.next(); + HelixProperty resource = e.getValue(); + Map<String, String> simpleFields = resource.getRecord().getSimpleFields(); + boolean isTerminable = resource.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true); + if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE) + || !simpleFields.containsKey(WorkflowConfig.DAG) || isTerminable) { + it.remove(); + } + } + + // Populate the result + List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet()); + hostedEntitiesRecord.setListField("JobQueues", allResources); + + StringRepresentation representation = + new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord), + MediaType.APPLICATION_JSON); + + return representation; + } + + @Override + public Representation post(Representation entity) { + try { + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + + Form form = new Form(entity); + // Get the job queue and submit it + if (form.size() < 1) { + throw new HelixException("Yaml job queue config is required!"); + } + Parameter payload = form.get(0); + String yamlPayload = payload.getName(); + if (yamlPayload == null) { + throw new HelixException("Yaml job queue config is required!"); + } + + Workflow workflow = Workflow.parse(yamlPayload); + JobQueue.Builder jobQueueCfgBuilder = new JobQueue.Builder(workflow.getName()); + jobQueueCfgBuilder.fromMap(workflow.getWorkflowConfig().getResourceConfigMap()); + TaskDriver driver = new TaskDriver(zkClient, clusterName); + driver.createQueue(jobQueueCfgBuilder.build()); + + getResponse().setEntity(getHostedEntitiesRepresentation(clusterName)); + getResponse().setStatus(Status.SUCCESS_OK); + } catch (Exception e) { + getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e), + MediaType.APPLICATION_JSON); + getResponse().setStatus(Status.SUCCESS_OK); + LOG.error("Exception in posting job queue: " + entity, e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java new file mode 100644 index 0000000..a58e223 --- /dev/null +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java @@ -0,0 +1,107 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathConfig; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskUtil; +import org.apache.log4j.Logger; +import org.restlet.data.MediaType; +import org.restlet.representation.Representation; +import org.restlet.representation.StringRepresentation; +import org.restlet.representation.Variant; +import org.restlet.resource.ServerResource; + +public class JobResource extends ServerResource { + private final static Logger LOG = Logger.getLogger(JobResource.class); + + public JobResource() { + getVariants().add(new Variant(MediaType.TEXT_PLAIN)); + getVariants().add(new Variant(MediaType.APPLICATION_JSON)); + setNegotiated(false); + } + + @Override + public Representation get() { + StringRepresentation presentation; + String clusterName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME); + String jobQueueName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE); + String jobName = + ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB); + + try { + presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName, jobName); + } catch (Exception e) { + String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e); + presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON); + + LOG.error("Fail to get job: " + jobName, e); + } + return presentation; + } + + StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName, + String jobName) throws Exception { + + ZkClient zkClient = + ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT); + HelixDataAccessor accessor = + ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // Get job queue config + String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, jobName); + HelixProperty jobConfig = accessor.getProperty(keyBuilder.resourceConfig(namespacedJobName)); + + // Get job queue context + JobContext ctx = null; + String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName); + HelixPropertyStore<ZNRecord> propertyStore = + new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null); + + ctx = TaskUtil.getJobContext(propertyStore, namespacedJobName); + + // Create the result + ZNRecord hostedEntitiesRecord = new ZNRecord(namespacedJobName); + if (jobConfig != null) { + hostedEntitiesRecord.merge(jobConfig.getRecord()); + } + if (ctx != null) { + hostedEntitiesRecord.merge(ctx.getRecord()); + } + + StringRepresentation representation = + new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord), + MediaType.APPLICATION_JSON); + + return representation; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java index 19ac71a..41d9a77 100644 --- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java @@ -96,8 +96,11 @@ public class JsonParameters { final Map<String, ZNRecord> _extraParameterMap = new HashMap<String, ZNRecord>(); public JsonParameters(Representation entity) throws Exception { - Form form = new Form(entity); + this(new Form(entity)); + } + + public JsonParameters(Form form) throws Exception { // get parameters in String format String jsonPayload = form.getFirstValue(JSON_PARAMETERS, true); if (jsonPayload == null || jsonPayload.isEmpty()) { @@ -151,7 +154,7 @@ public class JsonParameters { } if (!_parameterMap.containsKey(MANAGEMENT_COMMAND)) { - throw new HelixException("Missing management paramater '" + MANAGEMENT_COMMAND + "'"); + throw new HelixException("Missing management parameter '" + MANAGEMENT_COMMAND + "'"); } if (!_parameterMap.get(MANAGEMENT_COMMAND).equalsIgnoreCase(command) @@ -217,25 +220,17 @@ public class JsonParameters { } } else if (command.equalsIgnoreCase(ClusterSetup.addResource)) { if (!_parameterMap.containsKey(RESOURCE_GROUP_NAME)) { - throw new HelixException("Missing Json paramaters: '" + RESOURCE_GROUP_NAME + "'"); + throw new HelixException("Missing Json parameters: '" + RESOURCE_GROUP_NAME + "'"); } if (!_parameterMap.containsKey(PARTITIONS)) { - throw new HelixException("Missing Json paramaters: '" + PARTITIONS + "'"); + throw new HelixException("Missing Json parameters: '" + PARTITIONS + "'"); } if (!_parameterMap.containsKey(STATE_MODEL_DEF_REF)) { - throw new HelixException("Missing Json paramaters: '" + STATE_MODEL_DEF_REF + "'"); + throw new HelixException("Missing Json parameters: '" + STATE_MODEL_DEF_REF + "'"); } } } - - // temp test - public static void main(String[] args) throws Exception { - String jsonPayload = - "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}"; - Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload); - System.out.println(map); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java new file mode 100644 index 0000000..969bdf5 --- /dev/null +++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java @@ -0,0 +1,95 @@ +package org.apache.helix.webapp.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.webapp.RestAdminApplication; +import org.restlet.Context; +import org.restlet.Request; +import org.restlet.data.Form; +import org.restlet.representation.Representation; + +public class ResourceUtil { + /** + * Key enums for getting values from request + */ + public enum RequestKey { + CLUSTER_NAME("clusterName"), + JOB_QUEUE("jobQueue"), + JOB("job"); + + private final String _key; + + RequestKey(String key) { + _key = key; + } + + public String toString() { + return _key; + } + } + + /** + * Key enums for getting values from context + */ + public enum ContextKey { + ZK_ADDR(RestAdminApplication.ZKSERVERADDRESS), + ZKCLIENT(RestAdminApplication.ZKCLIENT); + + private final String _key; + + ContextKey(String key) { + _key = key; + } + + public String toString() { + return _key; + } + } + + /** + * Key enums for getting yaml format parameters + */ + public enum YamlParamKey { + NEW_JOB("newJob"); + + private final String _key; + YamlParamKey(String key) { + _key = key; + } + + public String toString() { + return _key; + } + } + + public static String getAttributeFromRequest(Request r, RequestKey key) { + return (String) r.getAttributes().get(key.toString()); + } + + + public static ZkClient getAttributeFromCtx(Context ctx, ContextKey key) { + return (ZkClient) ctx.getAttributes().get(key.toString()); + } + + public static String getYamlParameters(Form form, YamlParamKey key) { + return form.getFirstValue(key.toString()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java index 5b4411b..fdccee9 100644 --- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java @@ -58,9 +58,10 @@ public class AdminTestBase { AssertJUnit.assertTrue(_zkServer != null); ZKClientPool.reset(); - _gZkClient = new ZkClient(ZK_ADDR); - _gZkClient.setZkSerializer(new ZNRecordSerializer()); - _gSetupTool = new ClusterSetup(ZK_ADDR); + _gZkClient = + new ZkClient(ZK_ADDR, ZkClient.DEFAULT_CONNECTION_TIMEOUT, + ZkClient.DEFAULT_SESSION_TIMEOUT, new ZNRecordSerializer()); + _gSetupTool = new ClusterSetup(_gZkClient); // start admin _adminThread = new AdminThread(ZK_ADDR, ADMIN_PORT); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java index 9f6946d..5b371cc 100644 --- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java +++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java @@ -19,9 +19,26 @@ package org.apache.helix.webapp; * under the License. */ +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.apache.helix.ZNRecord; import org.apache.helix.webapp.HelixAdminWebApp; +import org.apache.helix.webapp.resources.ClusterRepresentationUtil; +import org.apache.helix.webapp.resources.JsonParameters; +import org.codehaus.jackson.map.ObjectMapper; +import org.restlet.Client; +import org.restlet.Request; +import org.restlet.Response; +import org.restlet.data.MediaType; +import org.restlet.data.Method; +import org.restlet.data.Reference; +import org.restlet.data.Status; +import org.restlet.representation.Representation; +import org.testng.Assert; public class AdminTestHelper { @@ -66,4 +83,44 @@ public class AdminTestHelper { } } + public static ZNRecord get(Client client, String url) throws IOException { + Reference resourceRef = new Reference(url); + Request request = new Request(Method.GET, resourceRef); + Response response = client.handle(request); + Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK); + Representation result = response.getEntity(); + StringWriter sw = new StringWriter(); + result.write(sw); + + String responseStr = sw.toString(); + Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1); + Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1); + ObjectMapper mapper = new ObjectMapper(); + ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class); + return record; + } + + public static ZNRecord post(Client client, String url, String body) + throws IOException { + Reference resourceRef = new Reference(url); + Request request = new Request(Method.POST, resourceRef); + + request.setEntity(body, MediaType.APPLICATION_ALL); + + Response response = client.handle(request); + Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK); + Representation result = response.getEntity(); + StringWriter sw = new StringWriter(); + + if (result != null) { + result.write(sw); + } + String responseStr = sw.toString(); + Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1); + Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1); + + ObjectMapper mapper = new ObjectMapper(); + ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class); + return record; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java new file mode 100644 index 0000000..6c0e0e1 --- /dev/null +++ b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java @@ -0,0 +1,190 @@ +package resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.Lists; + +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.DummyTask; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.beans.JobBean; +import org.apache.helix.task.beans.WorkflowBean; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.webapp.AdminTestBase; +import org.apache.helix.webapp.AdminTestHelper; +import org.apache.helix.webapp.resources.ClusterRepresentationUtil; +import org.apache.helix.webapp.resources.JsonParameters; +import org.apache.helix.webapp.resources.ResourceUtil; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.yaml.snakeyaml.Yaml; + +public class TestJobQueuesResource extends AdminTestBase { + private static final Logger LOG = Logger.getLogger(TestJobQueuesResource.class); + + @Test + public void test() throws Exception { + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + final int n = 5; + final int p = 20; + final int r = 3; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(clusterName, true); + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + _gSetupTool.addInstanceToCluster(clusterName, instanceName); + } + + // Set up target db + _gSetupTool.addResourceToCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, p, + "MasterSlave"); + _gSetupTool.rebalanceStorageCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, r); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("DummyTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new DummyTask(context); + } + }); + + // Start dummy participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) { + String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(participants[i], + taskFactoryReg)); + participants[i].syncStart(); + } + + // start controller + String controllerName = "controller"; + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, controllerName); + controller.syncStart(); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Start a queue + String queueName = "myQueue1"; + LOG.info("Starting job-queue: " + queueName); + String jobQueueYamlConfig = "name: " + queueName; + + String resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues"; + ZNRecord postRet = AdminTestHelper.post(_gClient, resourceUrl, jobQueueYamlConfig); + LOG.info("Started job-queue: " + queueName + ", ret: " + postRet); + + LOG.info("Getting all job-queues"); + ZNRecord getRet = AdminTestHelper.get(_gClient, resourceUrl); + LOG.info("Got job-queues: " + getRet); + + // Enqueue job + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; + + WorkflowBean wfBean = new WorkflowBean(); + wfBean.name = queueName; + JobBean jBean = new JobBean(); + jBean.name = "myJob1"; + jBean.command = "DummyTask"; + jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB; + jBean.targetPartitionStates = Lists.newArrayList("MASTER"); + wfBean.jobs = Lists.newArrayList(jBean); + String jobYamlConfig = new Yaml().dump(wfBean); + LOG.info("Enqueuing a job: " + jobQueueYamlConfig); + + Map<String, String> paraMap = new HashMap<String, String>(); + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString()); + + String postBody = + String.format("%s=%s&%s=%s", JsonParameters.JSON_PARAMETERS, + ClusterRepresentationUtil.ObjectToJson(paraMap), ResourceUtil.YamlParamKey.NEW_JOB.toString(), + jobYamlConfig); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Enqueued job, ret: " + postRet); + + // Get job + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName + + "/" + jBean.name; + getRet = AdminTestHelper.get(_gClient, resourceUrl); + LOG.info("Got job: " + getRet); + + // Stop job queue + resourceUrl = + "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName; + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.stop.toString()); + postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Stopped job-queue, ret: " + postRet); + + // Resume job queue + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString()); + postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap)); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Resumed job-queue, ret: " + postRet); + + // Flush job queue + paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush"); + postBody = + JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap); + postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody); + LOG.info("Flushed job-queue, ret: " + postRet); + + // clean up + controller.syncStop(); + for (int i = 0; i < n; i++) { + if (participants[i] != null && participants[i].isConnected()) { + participants[i].syncStop(); + } + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java ---------------------------------------------------------------------- diff --git a/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java new file mode 100644 index 0000000..383ac21 --- /dev/null +++ b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java @@ -0,0 +1,44 @@ +package resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; + +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.webapp.resources.ClusterRepresentationUtil; +import org.apache.helix.webapp.resources.JsonParameters; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestJsonParameters { + @Test + public void test() throws Exception { + String jsonPayload = + "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}"; + Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload); + Assert.assertNotNull(map.get(JsonParameters.MANAGEMENT_COMMAND)); + Assert.assertEquals(ClusterSetup.resetPartition, map.get(JsonParameters.MANAGEMENT_COMMAND)); + Assert.assertNotNull(map.get(JsonParameters.RESOURCE)); + Assert.assertEquals("DB-1", map.get(JsonParameters.RESOURCE)); + Assert.assertNotNull(map.get(JsonParameters.PARTITION)); + Assert.assertEquals("DB-1_22 DB-1_23", map.get(JsonParameters.PARTITION)); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 6a1fb72..3328279 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -792,7 +792,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { */ _disconnectTimeHistory.add(System.currentTimeMillis()); if (isFlapping()) { - LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. " + LOG.error("instanceName: " + _instanceName + " is flapping. disconnect it. " + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in " + _flappingTimeWindowMs + "ms."); disconnect(); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 0bd060a..60dc22c 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -90,12 +90,13 @@ public class TaskDriver { private final String _clusterName; /** Commands which may be parsed from the first argument to main */ - private enum DriverCommand { + public enum DriverCommand { start, stop, delete, resume, - list + list, + flush } public TaskDriver(HelixManager manager) { @@ -166,6 +167,10 @@ public class TaskDriver { break; case list: driver.list(resource); + break; + case flush: + driver.flushQueue(resource); + break; default: throw new IllegalArgumentException("Unknown command " + args[0]); } http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 1c0ef40..4ca6e68 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -132,7 +132,7 @@ public class Workflow { WorkflowBean wf = (WorkflowBean) yaml.load(reader); Builder builder = new Builder(wf.name); - if (wf != null) { + if (wf != null && wf.jobs != null) { for (JobBean job : wf.jobs) { if (job.name == null) { throw new IllegalArgumentException("A job must have a name."); http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index df0c74c..92c1d7d 100644 --- a/pom.xml +++ b/pom.xml @@ -255,7 +255,7 @@ under the License. <dependency> <groupId>org.restlet.jse</groupId> <artifactId>org.restlet</artifactId> - <version>2.2.1</version> + <version>2.2.3</version> </dependency> <dependency> <groupId>org.apache.helix</groupId>
