Clean up TaskDriver, move all command line support code to a new tool class TaskAdmin.java, and build a commandline shell script from TaskAdmin.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/55b84465 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/55b84465 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/55b84465 Branch: refs/heads/master Commit: 55b844657947dea661f6067dbd32237b0ed6afe2 Parents: 4e48719 Author: Lei Xia <[email protected]> Authored: Thu Jan 26 13:46:00 2017 -0800 Committer: Lei Xia <[email protected]> Committed: Mon Oct 2 19:06:41 2017 -0700 ---------------------------------------------------------------------- helix-core/pom.xml | 4 + .../stages/PersistAssignmentStage.java | 2 +- .../org/apache/helix/manager/zk/ZKUtil.java | 10 +- .../java/org/apache/helix/task/TaskDriver.java | 288 ++----------------- .../apache/helix/task/WorkflowRebalancer.java | 4 +- .../java/org/apache/helix/tools/TaskAdmin.java | 284 ++++++++++++++++++ 6 files changed, 321 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/pom.xml ---------------------------------------------------------------------- diff --git a/helix-core/pom.xml b/helix-core/pom.xml index 28e115b..789a5e1 100644 --- a/helix-core/pom.xml +++ b/helix-core/pom.xml @@ -232,6 +232,10 @@ under the License. <mainClass>org.apache.helix.tools.ZkGrep</mainClass> <name>zkgrep</name> </program> + <program> + <mainClass>org.apache.helix.tools.TaskAdmin</mainClass> + <name>task-admin</name> + </program> </programs> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java index b55a838..cd320a4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java @@ -96,7 +96,7 @@ public class PersistAssignmentStage extends AbstractBaseStage { partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId); } - Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap()); + Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap(); if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) { for (Partition partition : assignmentToPersist.keySet()) { http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java index 38b74cb..7300e07 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java @@ -27,7 +27,9 @@ import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.BaseDataAccessor; import org.apache.helix.InstanceType; import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; +import org.apache.helix.model.HelixConfigScope; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; @@ -52,11 +54,11 @@ public final class ZKUtil { ArrayList<String> requiredPaths = new ArrayList<String>(); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.IDEALSTATES, clusterName)); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName, - ConfigScopeProperty.CLUSTER.toString(), clusterName)); + HelixConfigScope.ConfigScopeProperty.CLUSTER.toString(), clusterName)); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName, - ConfigScopeProperty.PARTICIPANT.toString())); + HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString())); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName, - ConfigScopeProperty.RESOURCE.toString())); + HelixConfigScope.ConfigScopeProperty.RESOURCE.toString())); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName)); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.LIVEINSTANCES, clusterName)); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.INSTANCES, clusterName)); @@ -94,7 +96,7 @@ public final class ZKUtil { if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) { ArrayList<String> requiredPaths = new ArrayList<String>(); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName, - ConfigScopeProperty.PARTICIPANT.toString(), instanceName)); + HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString(), instanceName)); requiredPaths.add(PropertyPathBuilder .getPath(PropertyType.MESSAGES, clusterName, instanceName)); requiredPaths.add(PropertyPathBuilder.getPath(PropertyType.CURRENTSTATES, clusterName, http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index a639cd0..97703f7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -18,11 +18,8 @@ package org.apache.helix.task; * specific language governing permissions and limitations * under the License. */ - -import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -30,26 +27,14 @@ import java.util.Map; import java.util.Set; import org.I0Itec.zkclient.DataUpdater; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.OptionGroup; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -71,32 +56,6 @@ import com.google.common.collect.Sets; * CLI for scheduling/canceling workflows */ public class TaskDriver { - /** For logging */ - private static final Logger LOG = Logger.getLogger(TaskDriver.class); - - /** Required option name for Helix endpoint */ - private static final String ZK_ADDRESS = "zk"; - - /** Required option name for cluster against which to run task */ - private static final String CLUSTER_NAME_OPTION = "cluster"; - - /** Required option name for task resource within target cluster */ - private static final String RESOURCE_OPTION = "resource"; - - /** Field for specifying a workflow file when starting a job */ - private static final String WORKFLOW_FILE_OPTION = "file"; - - /** Default time out for monitoring workflow or job state */ - private final static int _defaultTimeout = 2 * 60 * 1000; /* 2 mins */ - - - private final HelixDataAccessor _accessor; - private final ConfigAccessor _cfgAccessor; - private final HelixPropertyStore<ZNRecord> _propertyStore; - private final HelixAdmin _admin; - private final String _clusterName; - - /** Commands which may be parsed from the first argument to main */ public enum DriverCommand { start, stop, @@ -107,9 +66,20 @@ public class TaskDriver { clean } + /** For logging */ + private static final Logger LOG = Logger.getLogger(TaskDriver.class); + + /** Default time out for monitoring workflow or job state */ + private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */ + + private final HelixDataAccessor _accessor; + private final HelixPropertyStore<ZNRecord> _propertyStore; + private final HelixAdmin _admin; + private final String _clusterName; + public TaskDriver(HelixManager manager) { - this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager - .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName()); + this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), + manager.getHelixPropertyStore(), manager.getClusterName()); } public TaskDriver(ZkClient client, String clusterName) { @@ -118,80 +88,24 @@ public class TaskDriver { public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) { this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor), - new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor, - PropertyPathBuilder.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName); + new ZkHelixPropertyStore<ZNRecord>(baseAccessor, + PropertyPathBuilder.propertyStore(clusterName), null), clusterName); } + @Deprecated public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor, HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { + this(admin, accessor, propertyStore, clusterName); + } + + public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, + HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { _admin = admin; _accessor = accessor; - _cfgAccessor = cfgAccessor; _propertyStore = propertyStore; _clusterName = clusterName; } - /** - * Parses the first argument as a driver command and the rest of the - * arguments are parsed based on that command. Constructs a Helix - * message and posts it to the controller - */ - public static void main(String[] args) throws Exception { - String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length); - CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]); - String zkAddr = cl.getOptionValue(ZK_ADDRESS); - String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION); - String resource = cl.getOptionValue(RESOURCE_OPTION); - - if (zkAddr == null || clusterName == null || resource == null) { - printUsage(constructOptions(), "[cmd]"); - throw new IllegalArgumentException( - "zk, cluster, and resource must all be non-null for all commands"); - } - - HelixManager helixMgr = - HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, - zkAddr); - helixMgr.connect(); - TaskDriver driver = new TaskDriver(helixMgr); - try { - DriverCommand cmd = DriverCommand.valueOf(args[0]); - switch (cmd) { - case start: - if (cl.hasOption(WORKFLOW_FILE_OPTION)) { - driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION)))); - } else { - throw new IllegalArgumentException("Workflow file is required to start flow."); - } - break; - case stop: - driver.setWorkflowTargetState(resource, TargetState.STOP); - break; - case resume: - driver.setWorkflowTargetState(resource, TargetState.START); - break; - case delete: - driver.setWorkflowTargetState(resource, TargetState.DELETE); - break; - case list: - driver.list(resource); - break; - case flush: - driver.flushQueue(resource); - break; - case clean: - driver.cleanupJobQueue(resource); - break; - default: - throw new IllegalArgumentException("Unknown command " + args[0]); - } - } catch (IllegalArgumentException e) { - LOG.error("Unknown driver command " + args[0]); - throw e; - } - - helixMgr.disconnect(); - } /** Schedules a new workflow * @@ -284,7 +198,7 @@ public class TaskDriver { } /** - * Flushes a named job queue + * Remove all jobs in a job queue * * @param queueName * @throws Exception @@ -626,8 +540,8 @@ public class TaskDriver { } /** - * Clean up final state jobs (ABORTED, FAILED, COMPLETED), - * which will consume the capacity, in job queue + * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. + * The job config, job context will be removed from Zookeeper. * * @param queueName The name of job queue */ @@ -688,7 +602,6 @@ public class TaskDriver { return is; } - /** * Add new job config to cluster */ @@ -881,59 +794,6 @@ public class TaskDriver { return workflowConfigMap; } - public void list(String resource) { - WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_accessor, resource); - if (wCfg == null) { - LOG.error("Workflow " + resource + " does not exist!"); - return; - } - WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, resource); - - LOG.info("Workflow " + resource + " consists of the following tasks: " - + wCfg.getJobDag().getAllNodes()); - String workflowState = - (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name(); - LOG.info("Current state of workflow is " + workflowState); - LOG.info("Job states are: "); - LOG.info("-------"); - for (String job : wCfg.getJobDag().getAllNodes()) { - TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED; - LOG.info("Job " + job + " is " + jobState); - - // fetch job information - JobConfig jCfg = TaskUtil.getJobCfg(_accessor, job); - JobContext jCtx = TaskUtil.getJobContext(_propertyStore, job); - if (jCfg == null || jCtx == null) { - LOG.info("-------"); - continue; - } - - // calculate taskPartitions - List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet()); - Collections.sort(partitions); - - // report status - for (Integer partition : partitions) { - String taskId = jCtx.getTaskIdForPartition(partition); - taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition); - LOG.info("Task: " + taskId); - TaskConfig taskConfig = jCfg.getTaskConfig(taskId); - if (taskConfig != null) { - LOG.info("Configuration: " + taskConfig.getConfigMap()); - } - TaskPartitionState state = jCtx.getPartitionState(partition); - state = (state != null) ? state : TaskPartitionState.INIT; - LOG.info("State: " + state); - String assignedParticipant = jCtx.getAssignedParticipant(partition); - if (assignedParticipant != null) { - LOG.info("Assigned participant: " + assignedParticipant); - } - LOG.info("-------"); - } - LOG.info("-------"); - } - } - /** * This call will be blocked until either workflow reaches to one of the state specified * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException @@ -1051,104 +911,4 @@ public class TaskDriver { throws InterruptedException { return pollForJobState(workflowName, jobName, _defaultTimeout, states); } - - /** Constructs options set for all basic control messages */ - private static Options constructOptions() { - Options options = new Options(); - options.addOptionGroup(contructGenericRequiredOptionGroup()); - options.addOptionGroup(constructStartOptionGroup()); - return options; - } - - /** Constructs option group containing options required by all drivable jobs */ - @SuppressWarnings("static-access") - private static OptionGroup contructGenericRequiredOptionGroup() { - Option zkAddressOption = - OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS) - .withDescription("ZK address managing cluster").create(); - zkAddressOption.setArgs(1); - zkAddressOption.setArgName("zkAddress"); - - Option clusterNameOption = - OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name") - .create(); - clusterNameOption.setArgs(1); - clusterNameOption.setArgName("clusterName"); - - Option taskResourceOption = - OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION) - .withDescription("Workflow or job name").create(); - taskResourceOption.setArgs(1); - taskResourceOption.setArgName("resourceName"); - - OptionGroup group = new OptionGroup(); - group.addOption(zkAddressOption); - group.addOption(clusterNameOption); - group.addOption(taskResourceOption); - return group; - } - - /** Constructs option group containing options required by all drivable jobs */ - private static OptionGroup constructStartOptionGroup() { - @SuppressWarnings("static-access") - Option workflowFileOption = - OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION) - .withDescription("Local file describing workflow").create(); - workflowFileOption.setArgs(1); - workflowFileOption.setArgName("workflowFile"); - - OptionGroup group = new OptionGroup(); - group.addOption(workflowFileOption); - return group; - } - - /** Attempts to parse options for given command, printing usage under failure */ - private static CommandLine parseOptions(String[] args, Options options, String cmdStr) { - CommandLineParser cliParser = new GnuParser(); - CommandLine cmd = null; - - try { - cmd = cliParser.parse(options, args); - } catch (ParseException pe) { - LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString()); - printUsage(options, cmdStr); - System.exit(1); - } - boolean ret = checkOptionArgsNumber(cmd.getOptions()); - if (!ret) { - printUsage(options, cmdStr); - System.exit(1); - } - - return cmd; - } - - /** Ensures options argument counts are correct */ - private static boolean checkOptionArgsNumber(Option[] options) { - for (Option option : options) { - int argNb = option.getArgs(); - String[] args = option.getValues(); - if (argNb == 0) { - if (args != null && args.length > 0) { - System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was " - + Arrays.toString(args) + ")"); - return false; - } - } else { - if (args == null || args.length != argNb) { - System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was " - + Arrays.toString(args) + ")"); - return false; - } - } - } - return true; - } - - /** Displays CLI usage for given option set and command name */ - private static void printUsage(Options cliOptions, String cmd) { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.setWidth(1000); - helpFormatter.printHelp("java " + TaskDriver.class.getName() + " " + cmd, cliOptions); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 11c6a61..830f93a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -482,8 +482,8 @@ public class WorkflowRebalancer extends TaskRebalancer { /** - * Cleans up workflow configs and workflow contexts associated with this workflow, - * including all job-level configs and context, plus workflow-level information. + * Cleans up job configs and job contexts associated with this job, + * including all job-level configs and context, plus the job info in the workflow context. */ private void cleanupJob(final String job, String workflow) { LOG.info("Cleaning up job: " + job + " in workflow: " + workflow); http://git-wip-us.apache.org/repos/asf/helix/blob/55b84465/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java new file mode 100644 index 0000000..7688017 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/tools/TaskAdmin.java @@ -0,0 +1,284 @@ +package org.apache.helix.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Lists; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.apache.log4j.Logger; + +/** + * CLI for operating workflows and jobs. + * This is a wrapper of TaskDriver instance to allow command line changes of workflows and jobs. + */ +public class TaskAdmin { + /** For logging */ + private static final Logger LOG = Logger.getLogger(TaskAdmin.class); + + /** Required option name for Helix endpoint */ + private static final String ZK_ADDRESS = "zk"; + + /** Required option name for cluster against which to run task */ + private static final String CLUSTER_NAME_OPTION = "cluster"; + + /** Required option name for task resource within target cluster */ + private static final String RESOURCE_OPTION = "resource"; + + /** Field for specifying a workflow file when starting a job */ + private static final String WORKFLOW_FILE_OPTION = "file"; + + /** + * Parses the first argument as a driver command and the rest of the + * arguments are parsed based on that command. Constructs a Helix + * message and posts it to the controller + */ + public static void main(String[] args) throws Exception { + String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length); + CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]); + String zkAddr = cl.getOptionValue(ZK_ADDRESS); + String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION); + String workflow = cl.getOptionValue(RESOURCE_OPTION); + + if (zkAddr == null || clusterName == null || workflow == null) { + printUsage(constructOptions(), "[cmd]"); + throw new IllegalArgumentException( + "zk, cluster, and resource must all be non-null for all commands"); + } + + HelixManager helixMgr = + HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, + zkAddr); + helixMgr.connect(); + TaskDriver driver = new TaskDriver(helixMgr); + try { + TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(args[0]); + switch (cmd) { + case start: + if (cl.hasOption(WORKFLOW_FILE_OPTION)) { + driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION)))); + } else { + throw new IllegalArgumentException("Workflow file is required to start flow."); + } + break; + case stop: + driver.stop(workflow); + break; + case resume: + driver.resume(workflow); + break; + case delete: + driver.delete(workflow); + break; + case list: + list(driver, workflow); + break; + case flush: + driver.flushQueue(workflow); + break; + case clean: + driver.cleanupJobQueue(workflow); + break; + default: + throw new IllegalArgumentException("Unknown command " + args[0]); + } + } catch (IllegalArgumentException e) { + LOG.error("Unknown driver command " + args[0]); + throw e; + } + + helixMgr.disconnect(); + } + + private static void list(TaskDriver taskDriver, String workflow) { + WorkflowConfig wCfg = taskDriver.getWorkflowConfig(workflow); + if (wCfg == null) { + LOG.error("Workflow " + workflow + " does not exist!"); + return; + } + WorkflowContext wCtx = taskDriver.getWorkflowContext(workflow); + + LOG.info("Workflow " + workflow + " consists of the following tasks: " + wCfg.getJobDag() + .getAllNodes()); + String workflowState = + (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name(); + LOG.info("Current state of workflow is " + workflowState); + LOG.info("Job states are: "); + LOG.info("-------"); + for (String job : wCfg.getJobDag().getAllNodes()) { + TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED; + LOG.info("Job " + job + " is " + jobState); + + // fetch job information + JobConfig jCfg = taskDriver.getJobConfig(job); + JobContext jCtx = taskDriver.getJobContext(job); + if (jCfg == null || jCtx == null) { + LOG.info("-------"); + continue; + } + + // calculate taskPartitions + List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet()); + Collections.sort(partitions); + + // report status + for (Integer partition : partitions) { + String taskId = jCtx.getTaskIdForPartition(partition); + taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition); + LOG.info("Task: " + taskId); + TaskConfig taskConfig = jCfg.getTaskConfig(taskId); + if (taskConfig != null) { + LOG.info("Configuration: " + taskConfig.getConfigMap()); + } + TaskPartitionState state = jCtx.getPartitionState(partition); + state = (state != null) ? state : TaskPartitionState.INIT; + LOG.info("State: " + state); + String assignedParticipant = jCtx.getAssignedParticipant(partition); + if (assignedParticipant != null) { + LOG.info("Assigned participant: " + assignedParticipant); + } + LOG.info("-------"); + } + LOG.info("-------"); + } + } + + /** Constructs option group containing options required by all drivable jobs */ + @SuppressWarnings("static-access") + private static OptionGroup contructGenericRequiredOptionGroup() { + Option zkAddressOption = + OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS) + .withDescription("ZK address managing cluster").create(); + zkAddressOption.setArgs(1); + zkAddressOption.setArgName("zkAddress"); + + Option clusterNameOption = + OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name") + .create(); + clusterNameOption.setArgs(1); + clusterNameOption.setArgName("clusterName"); + + Option taskResourceOption = + OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION) + .withDescription("Workflow or job name").create(); + taskResourceOption.setArgs(1); + taskResourceOption.setArgName("resourceName"); + + OptionGroup group = new OptionGroup(); + group.addOption(zkAddressOption); + group.addOption(clusterNameOption); + group.addOption(taskResourceOption); + return group; + } + + /** Constructs options set for all basic control messages */ + private static Options constructOptions() { + Options options = new Options(); + options.addOptionGroup(contructGenericRequiredOptionGroup()); + options.addOptionGroup(constructStartOptionGroup()); + return options; + } + + /** Constructs option group containing options required by all drivable jobs */ + private static OptionGroup constructStartOptionGroup() { + @SuppressWarnings("static-access") + Option workflowFileOption = + OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION) + .withDescription("Local file describing workflow").create(); + workflowFileOption.setArgs(1); + workflowFileOption.setArgName("workflowFile"); + + OptionGroup group = new OptionGroup(); + group.addOption(workflowFileOption); + return group; + } + + /** Attempts to parse options for given command, printing usage under failure */ + private static CommandLine parseOptions(String[] args, Options options, String cmdStr) { + CommandLineParser cliParser = new GnuParser(); + CommandLine cmd = null; + + try { + cmd = cliParser.parse(options, args); + } catch (ParseException pe) { + LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString()); + printUsage(options, cmdStr); + System.exit(1); + } + boolean ret = checkOptionArgsNumber(cmd.getOptions()); + if (!ret) { + printUsage(options, cmdStr); + System.exit(1); + } + + return cmd; + } + + /** Ensures options argument counts are correct */ + private static boolean checkOptionArgsNumber(Option[] options) { + for (Option option : options) { + int argNb = option.getArgs(); + String[] args = option.getValues(); + if (argNb == 0) { + if (args != null && args.length > 0) { + System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was " + + Arrays.toString(args) + ")"); + return false; + } + } else { + if (args == null || args.length != argNb) { + System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was " + + Arrays.toString(args) + ")"); + return false; + } + } + } + return true; + } + + /** Displays CLI usage for given option set and command name */ + private static void printUsage(Options cliOptions, String cmd) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(1000); + helpFormatter.printHelp("java " + TaskAdmin.class.getName() + " " + cmd, cliOptions); + } +}
