OOZIE-2108 bulk kill, suspend, resume jobs using existing filter, offset, len, and jobtype params (bzhang)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a9b3c7bb Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a9b3c7bb Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a9b3c7bb Branch: refs/heads/master Commit: a9b3c7bb4bb88db71692d5a4f3d698796de30dba Parents: b4c6006 Author: Bowen Zhang <[email protected]> Authored: Mon Mar 16 10:53:25 2015 -0700 Committer: Bowen Zhang <[email protected]> Committed: Mon Mar 16 10:56:16 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 72 +++++- .../org/apache/oozie/client/OozieClient.java | 43 +++ .../org/apache/oozie/client/rest/JsonTags.java | 2 + .../apache/oozie/client/rest/JsonToBean.java | 14 + .../java/org/apache/oozie/BundleEngine.java | 128 +++++---- .../org/apache/oozie/CoordinatorEngine.java | 71 +++++ .../main/java/org/apache/oozie/DagEngine.java | 67 +++++ .../main/java/org/apache/oozie/ErrorCode.java | 3 + .../org/apache/oozie/command/OperationType.java | 25 ++ .../command/bundle/BulkBundleXCommand.java | 142 ++++++++++ .../command/bundle/BundleKillXCommand.java | 2 +- .../oozie/command/coord/BulkCoordXCommand.java | 143 ++++++++++ .../oozie/command/wf/BulkWorkflowXCommand.java | 142 ++++++++++ .../oozie/service/AuthorizationService.java | 102 ++++++++ .../apache/oozie/servlet/BaseJobsServlet.java | 102 +++++++- .../org/apache/oozie/servlet/V0JobsServlet.java | 43 +++ .../org/apache/oozie/servlet/V1JobsServlet.java | 175 +++++++++++++ .../org/apache/oozie/util/JobsFilterUtils.java | 89 +++++++ .../org/apache/oozie/client/TestOozieCLI.java | 48 ++++ .../command/bundle/TestBulkBundleXCommand.java | 209 +++++++++++++++ .../command/coord/TestBulkCoordXCommand.java | 259 +++++++++++++++++++ .../command/wf/TestBulkWorkflowXCommand.java | 204 +++++++++++++++ .../servlet/MockCoordinatorEngineService.java | 22 ++ .../oozie/servlet/MockDagEngineService.java | 18 ++ docs/src/site/twiki/DG_CommandLineTool.twiki | 58 +++++ docs/src/site/twiki/WebServicesAPI.twiki | 120 +++++++++ release-log.txt | 1 + 27 files changed, 2247 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index 218edf2..5feb360 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -37,7 +37,11 @@ import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.XOozieClient; +import org.apache.oozie.client.rest.JsonTags; +import org.apache.oozie.client.rest.JsonToBean; import org.apache.oozie.client.rest.RestConstants; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -438,6 +442,9 @@ public class OozieCLI { "startcreatedtime, endcreatedtime: time of format yyyy-MM-dd'T'HH:mm'Z')"); Option localtime = new Option(LOCAL_TIME_OPTION, false, "use local time (same as passing your time zone to -" + TIME_ZONE_OPTION + "). Overrides -" + TIME_ZONE_OPTION + " option"); + Option kill = new Option(KILL_OPTION, false, "bulk kill operation"); + Option suspend = new Option(SUSPEND_OPTION, false, "bulk suspend operation"); + Option resume = new Option(RESUME_OPTION, false, "bulk resume operation"); Option timezone = new Option(TIME_ZONE_OPTION, true, "use time zone with the specified ID (default GMT).\nSee 'oozie info -timezones' for a list"); Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode"); @@ -452,6 +459,9 @@ public class OozieCLI { jobsOptions.addOption(oozie); jobsOptions.addOption(doAs); jobsOptions.addOption(localtime); + jobsOptions.addOption(kill); + jobsOptions.addOption(suspend); + jobsOptions.addOption(resume); jobsOptions.addOption(timezone); jobsOptions.addOption(start); jobsOptions.addOption(len); @@ -1507,6 +1517,11 @@ public class OozieCLI { private void jobsCommand(CommandLine commandLine) throws IOException, OozieCLIException { XOozieClient wc = createXOozieClient(commandLine); + List<String> options = new ArrayList<String>(); + for (Option option : commandLine.getOptions()) { + options.add(option.getOpt()); + } + String filter = commandLine.getOptionValue(FILTER_OPTION); String s = commandLine.getOptionValue(OFFSET_OPTION); int start = Integer.parseInt((s != null) ? s : "0"); @@ -1518,7 +1533,16 @@ public class OozieCLI { String bulkFilterString = commandLine.getOptionValue(BULK_OPTION); try { - if (bulkFilterString != null) { + if (options.contains(KILL_OPTION)) { + printBulkModifiedJobs(wc.killJobs(filter, jobtype, start, len), timeZoneId, "killed"); + } + else if (options.contains(SUSPEND_OPTION)) { + printBulkModifiedJobs(wc.suspendJobs(filter, jobtype, start, len), timeZoneId, "suspended"); + } + else if (options.contains(RESUME_OPTION)) { + printBulkModifiedJobs(wc.resumeJobs(filter, jobtype, start, len), timeZoneId, "resumed"); + } + else if (bulkFilterString != null) { printBulkJobs(wc.getBulkInfo(bulkFilterString, start, len), timeZoneId, commandLine.hasOption(VERBOSE_OPTION)); } else if (jobtype.toLowerCase().contains("wf")) { @@ -1538,6 +1562,52 @@ public class OozieCLI { } @VisibleForTesting + void printBulkModifiedJobs(JSONObject json, String timeZoneId, String action) throws IOException { + if (json.containsKey(JsonTags.WORKFLOWS_JOBS)) { + JSONArray workflows = (JSONArray) json.get(JsonTags.WORKFLOWS_JOBS); + if (workflows == null) { + workflows = new JSONArray(); + } + List<WorkflowJob> wfs = JsonToBean.createWorkflowJobList(workflows); + if (wfs.isEmpty()) { + System.out.println("bulk modify command did not modify any jobs"); + } + else { + System.out.println("the following jobs have been " + action); + printJobs(wfs, timeZoneId, false); + } + } + else if (json.containsKey(JsonTags.COORDINATOR_JOBS)) { + JSONArray coordinators = (JSONArray) json.get(JsonTags.COORDINATOR_JOBS); + if (coordinators == null) { + coordinators = new JSONArray(); + } + List<CoordinatorJob> coords = JsonToBean.createCoordinatorJobList(coordinators); + if (coords.isEmpty()) { + System.out.println("bulk modify command did not modify any jobs"); + } + else { + System.out.println("the following jobs have been " + action); + printCoordJobs(coords, timeZoneId, false); + } + } + else { + JSONArray bundles = (JSONArray) json.get(JsonTags.BUNDLE_JOBS); + if (bundles == null) { + bundles = new JSONArray(); + } + List<BundleJob> bundleJobs = JsonToBean.createBundleJobList(bundles); + if (bundleJobs.isEmpty()) { + System.out.println("bulk modify command did not modify any jobs"); + } + else { + System.out.println("the following jobs have been " + action); + printBundleJobs(bundleJobs, timeZoneId, false); + } + } + } + + @VisibleForTesting void printCoordJobs(List<CoordinatorJob> jobs, String timeZoneId, boolean verbose) throws IOException { if (jobs != null && jobs.size() > 0) { if (verbose) { http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index 5de25cc..416d066 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -699,6 +699,30 @@ public class OozieClient { } } + private class JobsAction extends ClientCallable<JSONObject> { + + JobsAction(String action, String filter, String jobType, int start, int len) { + super("PUT", RestConstants.JOBS, "", + prepareParams(RestConstants.ACTION_PARAM, action, + RestConstants.JOB_FILTER_PARAM, filter, RestConstants.JOBTYPE_PARAM, jobType, + RestConstants.OFFSET_PARAM, Integer.toString(start), + RestConstants.LEN_PARAM, Integer.toString(len))); + } + + @Override + protected JSONObject call(HttpURLConnection conn) throws IOException, OozieClientException { + conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); + if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { + Reader reader = new InputStreamReader(conn.getInputStream()); + JSONObject json = (JSONObject) JSONValue.parse(reader); + return json; + } + else { + handleError(conn); + } + return null; + } + } /** * Update coord definition. * @@ -849,6 +873,25 @@ public class OozieClient { return new CoordActionsKill(jobId, rangeType, scope).call(); } + public JSONObject bulkModifyJobs(String actionType, String filter, String jobType, int start, int len) + throws OozieClientException { + return new JobsAction(actionType, filter, jobType, start, len).call(); + } + + public JSONObject killJobs(String filter, String jobType, int start, int len) + throws OozieClientException { + return bulkModifyJobs("kill", filter, jobType, start, len); + } + + public JSONObject suspendJobs(String filter, String jobType, int start, int len) + throws OozieClientException { + return bulkModifyJobs("suspend", filter, jobType, start, len); + } + + public JSONObject resumeJobs(String filter, String jobType, int start, int len) + throws OozieClientException { + return bulkModifyJobs("resume", filter, jobType, start, len); + } /** * Change a coordinator job. * http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java index 1022dd7..3c2409f 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java @@ -40,6 +40,8 @@ public interface JsonTags { public static final String JOB_ID = "id"; + public static final String JOB_IDS = "ids"; + public static final String WORKFLOW_APP_PATH = "appPath"; public static final String WORKFLOW_APP_NAME = "appName"; public static final String WORKFLOW_ID = "id"; http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java b/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java index 90dbda9..4e01a3e 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java @@ -465,4 +465,18 @@ public class JsonToBean { return list; } + /** + * Creates a list of bulk write job ids from a JSON array. + * + * @param json json array. + * @return a list of jobs ids from a JSON array. + */ + public static List<String> createBulkWriteJobIdList(JSONArray json) { + List<String> list = new ArrayList<String>(); + for (Object obj : json) { + list.add(obj.toString()); + } + return list; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index 659c8e6..e191184 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -42,6 +42,7 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand; import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand; import org.apache.oozie.command.bundle.BundleSLAChangeXCommand; +import org.apache.oozie.command.bundle.BulkBundleXCommand; import org.apache.oozie.command.bundle.BundleJobChangeXCommand; import org.apache.oozie.command.bundle.BundleJobResumeXCommand; import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; @@ -51,12 +52,14 @@ import org.apache.oozie.command.bundle.BundleKillXCommand; import org.apache.oozie.command.bundle.BundleRerunXCommand; import org.apache.oozie.command.bundle.BundleStartXCommand; import org.apache.oozie.command.bundle.BundleSubmitXCommand; +import org.apache.oozie.command.OperationType; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.JobsFilterUtils; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.XLogFilter; import org.apache.oozie.util.XLogUserFilterParam; @@ -64,6 +67,9 @@ import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; import com.google.common.annotations.VisibleForTesting; +import sun.reflect.generics.tree.ReturnType; + +import javax.servlet.ServletException; public class BundleEngine extends BaseEngine { /** @@ -318,16 +324,6 @@ public class BundleEngine extends BaseEngine { } } - private static final Set<String> FILTER_NAMES = new HashSet<String>(); - - static { - FILTER_NAMES.add(OozieClient.FILTER_USER); - FILTER_NAMES.add(OozieClient.FILTER_NAME); - FILTER_NAMES.add(OozieClient.FILTER_GROUP); - FILTER_NAMES.add(OozieClient.FILTER_STATUS); - FILTER_NAMES.add(OozieClient.FILTER_ID); - } - /** * Get bundle jobs * @@ -357,48 +353,18 @@ public class BundleEngine extends BaseEngine { */ @VisibleForTesting Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { - Map<String, List<String>> map = new HashMap<String, List<String>>(); - if (filter != null) { - StringTokenizer st = new StringTokenizer(filter, ";"); - while (st.hasMoreTokens()) { - String token = st.nextToken(); - if (token.contains("=")) { - String[] pair = token.split("="); - if (pair.length != 2) { - throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); - } - if (!FILTER_NAMES.contains(pair[0])) { - throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", - pair[0])); - } - if (pair[0].equals("status")) { - try { - Job.Status.valueOf(pair[1]); - } - catch (IllegalArgumentException ex) { - throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format( - "invalid status [{0}]", pair[1])); - } - } - List<String> list = map.get(pair[0]); - if (list == null) { - list = new ArrayList<String>(); - map.put(pair[0], list); - } - list.add(pair[1]); - } - else { - throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); - } - } + try { + return JobsFilterUtils.parseFilter(filter); + } + catch (ServletException ex) { + throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage()); } - return map; } /** * Get bulk job response * - * @param filter the filter string + * @param bulkFilter the filter string * @param start start location for paging * @param len total length to get * @return bulk job info @@ -418,7 +384,7 @@ public class BundleEngine extends BaseEngine { * Parse filter string to a map with key = filter name and values = filter values * Allowed keys are defined as constants on top * - * @param filter the filter string + * @param bulkParams the filter string * @return filter key-value pair map * @throws BundleEngineException thrown if failed to parse filter string */ @@ -546,4 +512,72 @@ public class BundleEngine extends BaseEngine { } } + /** + * return a list of killed Bundle job + * + * @param filter, the filter string for which the bundle jobs are killed + * @param start, the starting index for bundle jobs + * @param len, maximum number of jobs to be killed + * @return the list of jobs being killed + * @throws BundleEngineException thrown if one or more of the jobs cannot be killed + */ + public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call(); + if (bundleJobInfo == null) { + return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); + } + return bundleJobInfo; + } + catch (CommandException ex) { + throw new BundleEngineException(ex); + } + } + + /** + * return a list of suspended Bundle job + * + * @param filter, the filter string for which the bundle jobs are suspended + * @param start, the starting index for bundle jobs + * @param len, maximum number of jobs to be suspended + * @return the list of jobs being suspended + * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended + */ + public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call(); + if (bundleJobInfo == null) { + return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); + } + return bundleJobInfo; + } + catch (CommandException ex) { + throw new BundleEngineException(ex); + } + } + + /** + * return a list of resumed Bundle job + * + * @param filter, the filter string for which the bundle jobs are resumed + * @param start, the starting index for bundle jobs + * @param len, maximum number of jobs to be resumed + * @return the list of jobs being resumed + * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed + */ + public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call(); + if (bundleJobInfo == null) { + return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); + } + return bundleJobInfo; + } + catch (CommandException ex) { + throw new BundleEngineException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 642a82a..1b21d87 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -28,6 +28,7 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.BulkCoordXCommand; import org.apache.oozie.command.coord.CoordActionInfoXCommand; import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; import org.apache.oozie.command.coord.CoordActionsKillXCommand; @@ -43,6 +44,7 @@ import org.apache.oozie.command.coord.CoordSLAChangeXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.coord.CoordUpdateXCommand; +import org.apache.oozie.command.OperationType; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -894,4 +896,73 @@ public class CoordinatorEngine extends BaseEngine { } } + /** + * return a list of killed Coordinator job + * + * @param filter, the filter string for which the coordinator jobs are killed + * @param start, the starting index for coordinator jobs + * @param length, maximum number of jobs to be killed + * @return the list of jobs being killed + * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed + */ + public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /** + * return the jobs that've been suspended + * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be suspended + * @param length maximum number of jobs that will be suspended + * @return + * @throws CoordinatorEngineException + */ + public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } + + /** + * return the jobs that've been resumed + * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be resumed + * @param length maximum number of jobs that will be resumed + * @return + * @throws CoordinatorEngineException + */ + public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException { + try { + Map<String, List<String>> filterMap = parseJobsFilter(filter); + CoordinatorJobInfo coordinatorJobInfo = + new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call(); + if (coordinatorJobInfo == null) { + return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0); + } + return coordinatorJobInfo; + } + catch (CommandException ex) { + throw new CoordinatorEngineException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index ac2e7b1..faf2d64 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -42,6 +42,8 @@ import org.apache.oozie.command.wf.SubmitSqoopXCommand; import org.apache.oozie.command.wf.SubmitXCommand; import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.command.wf.WorkflowActionInfoXCommand; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.wf.BulkWorkflowXCommand; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; @@ -601,4 +603,69 @@ public class DagEngine extends BaseEngine { throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow")); } + /** + * return the jobs that've been killed + * @param filter Jobs that satisfy the filter will be killed + * @param start start index in the database of jobs + * @param len maximum number of jobs that will be killed + * @return + * @throws DagEngineException + */ + public WorkflowsInfo killJobs(String filter, int start, int len) throws DagEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, start, len, OperationType.Kill).call(); + if (workflowsInfo == null) { + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + return workflowsInfo; + } + catch (CommandException ex) { + throw new DagEngineException(ex); + } + } + + /** + * return the jobs that've been suspended + * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be suspended + * @param len maximum number of jobs that will be suspended + * @return + * @throws DagEngineException + */ + public WorkflowsInfo suspendJobs(String filter, int start, int len) throws DagEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, start, len, OperationType.Suspend).call(); + if (workflowsInfo == null) { + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + return workflowsInfo; + } + catch (CommandException ex) { + throw new DagEngineException(ex); + } + } + + /** + * return the jobs that've been resumed + * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any + * @param start Offset for the jobs that will be resumed + * @param len maximum number of jobs that will be resumed + * @return + * @throws DagEngineException + */ + public WorkflowsInfo resumeJobs(String filter, int start, int len) throws DagEngineException { + try { + Map<String, List<String>> filterList = parseFilter(filter); + WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, start, len, OperationType.Resume).call(); + if (workflowsInfo == null) { + return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 0, 0); + } + return workflowsInfo; + } + catch (CommandException ex) { + throw new DagEngineException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 2fd2e99..8ee550a 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -219,6 +219,8 @@ public enum ErrorCode { E1101(XLog.STD, "SLA <{0}> cannot be empty."), + E1102(XLog.STD, "Invalid operation [{0}] for bulk command"), + E1201(XLog.STD, "State [{0}] is invalid for job [{1}]."), E1301(XLog.STD, "Could not read the bundle job definition, [{0}]"), @@ -243,6 +245,7 @@ public enum ErrorCode { E1320(XLog.STD, "Bundle Job change error, [{0}]"), E1321(XLog.STD, "Error evaluating coord name, [{0}]"), E1322(XLog.STD, "Bundle status transit error: [{0}]"), + E1323(XLog.STD, "Could not kill bundle job, this job either finished successfully or does not exist , [{0}]"), E1400(XLog.STD, "doAs (proxyuser) failure"), http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/OperationType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/OperationType.java b/core/src/main/java/org/apache/oozie/command/OperationType.java new file mode 100644 index 0000000..ec3d8f5 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/OperationType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command; + + +public enum OperationType { + Kill, + Suspend, + Resume +}; http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java new file mode 100644 index 0000000..d405ccb --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.bundle; + +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.BundleJobInfo; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.Job; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; + +import java.util.List; +import java.util.Map; + +public class BulkBundleXCommand extends XCommand<BundleJobInfo> { + private Map<String, List<String>> filter; + private final int start; + private final int len; + private BundleJobInfo bundleJobInfo; + private OperationType operation; + + /** + * The constructor for BulkBundleXCommand + * + * @param filter the filter string + * @param start start location for paging + * @param length total length to get + * @param operation the type of operation to perform, it can be kill, suspend or resume + */ + public BulkBundleXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) { + super("bulkbundle" + operation, "bulkbundle" + operation, 1); + this.filter = filter; + this.start = start; + this.len = length; + this.operation = operation; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#isLockRequired() + */ + @Override + protected boolean isLockRequired() { + return false; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#getEntityKey() + */ + @Override + public String getEntityKey() { + return null; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#loadState() + */ + @Override + protected void loadState() throws CommandException { + loadBundleJobs(); + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#verifyPrecondition() + */ + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#execute() + */ + @Override + protected BundleJobInfo execute() throws CommandException { + List<BundleJobBean> jobs = this.bundleJobInfo.getBundleJobs(); + for (BundleJobBean job : jobs) { + switch (operation) { + case Kill: + if (job.getStatus() != Job.Status.SUCCEEDED + && job.getStatus() != Job.Status.FAILED + && job.getStatus() != Job.Status.DONEWITHERROR + && job.getStatus() != Job.Status.KILLED) { + new BundleKillXCommand(job.getId()).call(); + } + break; + case Suspend: + if (job.getStatus() != Job.Status.SUCCEEDED + && job.getStatus() != Job.Status.FAILED + && job.getStatus() != Job.Status.KILLED + && job.getStatus() != Job.Status.DONEWITHERROR) { + new BundleJobSuspendXCommand(job.getId()).call(); + } + break; + case Resume: + if (job.getStatus() == Job.Status.SUSPENDED + || job.getStatus() == Job.Status.SUSPENDEDWITHERROR + || job.getStatus() == Job.Status.PREPSUSPENDED) { + new BundleJobResumeXCommand(job.getId()).call(); + } + break; + default: + throw new CommandException(ErrorCode.E1102, operation); + } + } + loadBundleJobs(); + return this.bundleJobInfo; + } + + private void loadBundleJobs() throws CommandException { + try { + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + this.bundleJobInfo = jpaService.execute(new BundleJobInfoGetJPAExecutor(filter, start, len)); + } + else { + throw new CommandException(ErrorCode.E0610); + } + } + catch (Exception ex) { + throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java index 3f22a18..1516ec3 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java @@ -91,7 +91,7 @@ public class BundleKillXCommand extends KillTransitionXCommand { || bundleJob.getStatus() == Job.Status.KILLED) { LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " + jobId + ", status = " + bundleJob.getStatus()); - throw new PreconditionException(ErrorCode.E1020, jobId); + throw new PreconditionException(ErrorCode.E1323, jobId); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java new file mode 100644 index 0000000..e0ccd0f --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.coord; + +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.CoordinatorJobInfo; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.Job; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; + +import java.util.List; +import java.util.Map; + +public class BulkCoordXCommand extends XCommand<CoordinatorJobInfo> { + private Map<String, List<String>> filter; + private final int start; + private final int len; + private CoordinatorJobInfo coordinatorJobInfo; + private OperationType operation; + + /** + * The constructor for BulkCoordXCommand + * + * @param filter the filter string + * @param start start location for paging + * @param length total length to get + */ + public BulkCoordXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) { + super("bulkcoord" + operation, "bulkcoord" + operation, 1); + this.filter = filter; + this.start = start; + this.len = length; + this.operation = operation; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#isLockRequired() + */ + @Override + protected boolean isLockRequired() { + return false; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#getEntityKey() + */ + @Override + public String getEntityKey() { + return null; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#loadState() + */ + @Override + protected void loadState() throws CommandException { + loadJobs(); + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#verifyPrecondition() + */ + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#execute() + */ + @Override + protected CoordinatorJobInfo execute() throws CommandException { + List<CoordinatorJobBean> jobs = this.coordinatorJobInfo.getCoordJobs(); + for (CoordinatorJobBean job : jobs) { + switch (operation) { + case Kill: + if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED + && job.getStatus() != CoordinatorJob.Status.FAILED + && job.getStatus() != CoordinatorJob.Status.DONEWITHERROR + && job.getStatus() != CoordinatorJob.Status.KILLED + && job.getStatus() != CoordinatorJob.Status.IGNORED) { + new CoordKillXCommand(job.getId()).call(); + } + break; + case Suspend: + if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED + && job.getStatus() != CoordinatorJob.Status.FAILED + && job.getStatus() != CoordinatorJob.Status.KILLED + && job.getStatus() != CoordinatorJob.Status.IGNORED) { + new CoordSuspendXCommand(job.getId()).call(); + } + break; + case Resume: + if (job.getStatus() == CoordinatorJob.Status.SUSPENDED || + job.getStatus() == CoordinatorJob.Status.SUSPENDEDWITHERROR || + job.getStatus() == Job.Status.PREPSUSPENDED) { + new CoordResumeXCommand(job.getId()).call(); + } + break; + default: + throw new CommandException(ErrorCode.E1102, operation); + } + } + loadJobs(); + return this.coordinatorJobInfo; + } + + private void loadJobs() throws CommandException { + try { + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + this.coordinatorJobInfo = jpaService.execute(new CoordJobInfoGetJPAExecutor(filter, start, len)); + } + else { + throw new CommandException(ErrorCode.E0610); + } + } + catch (Exception ex) { + throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java new file mode 100644 index 0000000..4e0b606 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.wf; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.WorkflowsInfo; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.OperationType; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; + +import java.util.List; +import java.util.Map; + +public class BulkWorkflowXCommand extends WorkflowXCommand<WorkflowsInfo> { + private final Map<String, List<String>> filter; + private final int start; + private final int len; + private WorkflowsInfo workflowsInfo; + private OperationType operation; + + /** + * constructor taking the filter information. + * + * @param filter Can be name, status, user, group and combination of these + * @param start starting from this index in the list of workflows matching the filter are killed + * @param length number of workflows to be killed from the list of workflows matching the filter and starting from + * index "start". + */ + public BulkWorkflowXCommand(Map<String, List<String>> filter, int start, int length, OperationType operation) { + super("bulkkill", "bulkkill", 1, true); + this.filter = filter; + this.start = start; + this.len = length; + this.operation = operation; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#execute() + */ + @Override + protected WorkflowsInfo execute() throws CommandException { + try { + List<WorkflowJobBean> workflows = this.workflowsInfo.getWorkflows(); + for (WorkflowJobBean job : workflows) { + switch (operation) { + case Kill: + if (job.getStatus() == WorkflowJob.Status.PREP + || job.getStatus() == WorkflowJob.Status.RUNNING + || job.getStatus() == WorkflowJob.Status.SUSPENDED + || job.getStatus() == WorkflowJob.Status.FAILED) { + new KillXCommand(job.getId()).call(); + } + break; + case Suspend: + if (job.getStatus() == WorkflowJob.Status.RUNNING) { + new SuspendXCommand(job.getId()).call(); + } + break; + case Resume: + if (job.getStatus() == WorkflowJob.Status.SUSPENDED) { + new ResumeXCommand(job.getId()).call(); + } + break; + default: + throw new CommandException(ErrorCode.E1102, operation); + } + } + loadJobs(); + return this.workflowsInfo; + } + catch (Exception ex) { + throw new CommandException(ErrorCode.E0725, ex.getMessage(), ex); + } + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#getEntityKey() + */ + @Override + public String getEntityKey() { + return null; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#isLockRequired() + */ + @Override + protected boolean isLockRequired() { + return false; + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#loadState() + */ + @Override + protected void loadState() throws CommandException { + loadJobs(); + } + + /* (non-Javadoc) + * @see org.apache.oozie.command.XCommand#verifyPrecondition() + */ + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + private void loadJobs() throws CommandException { + try { + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + this.workflowsInfo = jpaService.execute( + new WorkflowsJobGetJPAExecutor(this.filter, this.start, this.len)); + } + else { + throw new CommandException(ErrorCode.E0610); + } + } + catch (Exception ex) { + throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/service/AuthorizationService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/AuthorizationService.java b/core/src/main/java/org/apache/oozie/service/AuthorizationService.java index 9ce0640..cd2372e 100644 --- a/core/src/main/java/org/apache/oozie/service/AuthorizationService.java +++ b/core/src/main/java/org/apache/oozie/service/AuthorizationService.java @@ -26,6 +26,9 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -37,6 +40,9 @@ import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor; +import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; @@ -503,6 +509,102 @@ public class AuthorizationService implements Service { } /** + * Check if the user+group is authorized to operate on the specified jobs. <p/> Checks if the user is a super-user or + * the one who started the jobs. <p/> Read operations are allowed to all users. + * + * @param user user name. + * @param filter filter used to select jobs + * @param start starting index of the jobs in DB + * @param len maximum amount of jbos to select + * @param write indicates if the check is for read or write job tasks. + * @throws AuthorizationException thrown if the user is not authorized for the job. + */ + public void authorizeForJobs(String user, Map<String, List<String>> filter, String jobType, + int start, int len, boolean write) throws AuthorizationException { + if (authorizationEnabled && write && !isAdmin(user)) { + try { + // handle workflow jobs + if (jobType.equals("wf")) { + List<WorkflowJobBean> jobBeans = new ArrayList<WorkflowJobBean>(); + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + try { + jobBeans = jpaService.execute(new WorkflowsJobGetJPAExecutor( + filter, start, len)).getWorkflows(); + } + catch (JPAExecutorException je) { + throw new AuthorizationException(je); + } + } + else { + throw new AuthorizationException(ErrorCode.E0610); + } + for (WorkflowJobBean jobBean : jobBeans) { + if (jobBean != null && !jobBean.getUser().equals(user)) { + if (!isUserInAcl(user, jobBean.getGroup())) { + incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); + throw new AuthorizationException(ErrorCode.E0508, user, jobBean.getId()); + } + } + } + } + // handle bundle jobs + else if (jobType.equals("bundle")) { + List<BundleJobBean> jobBeans = new ArrayList<BundleJobBean>(); + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + try { + jobBeans = jpaService.execute(new BundleJobInfoGetJPAExecutor( + filter, start, len)).getBundleJobs(); + } + catch (JPAExecutorException je) { + throw new AuthorizationException(je); + } + } + else { + throw new AuthorizationException(ErrorCode.E0610); + } + for (BundleJobBean jobBean : jobBeans){ + if (jobBean != null && !jobBean.getUser().equals(user)) { + if (!isUserInAcl(user, jobBean.getGroup())) { + incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); + throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId()); + } + } + } + } + // handle coordinator jobs + else { + List<CoordinatorJobBean> jobBeans = new ArrayList<CoordinatorJobBean>(); + JPAService jpaService = Services.get().get(JPAService.class); + if (jpaService != null) { + try { + jobBeans = jpaService.execute(new CoordJobInfoGetJPAExecutor( + filter, start, len)).getCoordJobs(); + } + catch (JPAExecutorException je) { + throw new AuthorizationException(je); + } + } + else { + throw new AuthorizationException(ErrorCode.E0610); + } + for (CoordinatorJobBean jobBean : jobBeans) { + if (jobBean != null && !jobBean.getUser().equals(user)) { + if (!isUserInAcl(user, jobBean.getGroup())) { + incrCounter(INSTR_FAILED_AUTH_COUNTER, 1); + throw new AuthorizationException(ErrorCode.E0509, user, jobBean.getId()); + } + } + } + } + } + catch (IOException ex) { + throw new AuthorizationException(ErrorCode.E0501, ex.getMessage(), ex); + } + } + } + /** * Convenience method for instrumentation counters. * * @param name counter name. http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java index 01de277..54b173b 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java @@ -30,8 +30,10 @@ import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.service.AuthorizationException; +import org.apache.oozie.service.AuthorizationService; import org.apache.oozie.util.JobUtils; +import org.apache.oozie.util.JobsFilterUtils; import org.apache.oozie.util.XConfiguration; import org.json.simple.JSONObject; @@ -41,20 +43,20 @@ public abstract class BaseJobsServlet extends JsonRestServlet { static { RESOURCES_INFO[0] = new JsonRestServlet.ResourceInfo("", Arrays.asList( - "POST", "GET"), Arrays.asList( + "POST", "GET", "PUT"), Arrays.asList( new JsonRestServlet.ParameterInfo(RestConstants.ACTION_PARAM, - String.class, false, Arrays.asList("POST")), + String.class, false, Arrays.asList("POST", "PUT")), new JsonRestServlet.ParameterInfo( RestConstants.JOBS_FILTER_PARAM, String.class, false, - Arrays.asList("GET")), + Arrays.asList("GET", "PUT")), new JsonRestServlet.ParameterInfo(RestConstants.JOBTYPE_PARAM, - String.class, false, Arrays.asList("GET", "POST")), + String.class, false, Arrays.asList("GET", "POST", "PUT")), new JsonRestServlet.ParameterInfo(RestConstants.OFFSET_PARAM, - String.class, false, Arrays.asList("GET")), + String.class, false, Arrays.asList("GET", "PUT")), new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM, - String.class, false, Arrays.asList("GET")), + String.class, false, Arrays.asList("GET", "PUT")), new JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM, - String.class, false, Arrays.asList("GET")), + String.class, false, Arrays.asList("GET", "PUT")), new JsonRestServlet.ParameterInfo( RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class, false, Arrays.asList("GET")))); @@ -125,6 +127,90 @@ public abstract class BaseJobsServlet extends JsonRestServlet { } /** + * Perform various job related actions - suspend, resume, kill, etc. + */ + @Override + protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + request.setAttribute(AUDIT_PARAM, request.getParameter(RestConstants.JOBS_FILTER_PARAM)); + request.setAttribute(AUDIT_OPERATION, request.getParameter(RestConstants.ACTION_PARAM)); + try { + AuthorizationService auth = Services.get().get(AuthorizationService.class); + String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); + String startStr = request.getParameter(RestConstants.OFFSET_PARAM); + String lenStr = request.getParameter(RestConstants.LEN_PARAM); + String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); + + int start = (startStr != null) ? Integer.parseInt(startStr) : 1; + start = (start < 1) ? 1 : start; + int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; + len = (len < 1) ? 50 : len; + auth.authorizeForJobs(getUser(request), JobsFilterUtils.parseFilter(filter), jobType, start, len, true); + } + catch (AuthorizationException ex) { + throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, ex); + } + + String action = request.getParameter(RestConstants.ACTION_PARAM); + JSONObject json = null; + if (action.equals(RestConstants.JOB_ACTION_KILL)) { + stopCron(); + json = killJobs(request, response); + startCron(); + } + else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { + stopCron(); + json = resumeJobs(request, response); + startCron(); + } + else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { + stopCron(); + json = suspendJobs(request, response); + startCron(); + } + else { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, + RestConstants.ACTION_PARAM, action); + } + response.setStatus(HttpServletResponse.SC_OK); + sendJsonResponse(response, HttpServletResponse.SC_OK, json); + } + + /** + * abstract method to kill jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs + * + * @param request + * @param response + * @return JSONObject of all jobs being killed + * @throws XServletException + * @throws IOException + */ + abstract JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + + /** + * abstract method to suspend jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs + * + * @param request + * @param response + * @return JSONObject of all jobs being suspended + * @throws XServletException + * @throws IOException + */ + abstract JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + + /** + * abstract method to resume jobs based ona filter param. The jobs could be workflow, coordinator or bundle jobs + * + * @param request + * @param response + * @return JSONObject of all jobs being resumed + * @throws XServletException + * @throws IOException + */ + abstract JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + /** * abstract method to submit a job, either workflow or coordinator in the case of workflow job, there is an optional * flag in request to indicate if want this job to be started immediately or not * http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java index da30bce..2c79ef0 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java @@ -121,4 +121,47 @@ public class V0JobsServlet extends BaseJobsServlet { return json; } + + + /** + * service implementation to bulk kill jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new UnsupportedOperationException("method not implemented in V0 API"); + } + + /** + * service implementation to bulk suspend jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new UnsupportedOperationException("method not implemented in V0 API"); + } + + /** + * service implementation to bulk resume jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new UnsupportedOperationException("method not implemented in V0 API"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java index 6c20ad7..80c8ec4 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java @@ -19,6 +19,7 @@ package org.apache.oozie.servlet; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -53,6 +54,7 @@ import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.Services; import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; +import org.json.simple.JSONArray; import org.json.simple.JSONObject; public class V1JobsServlet extends BaseJobsServlet { @@ -445,4 +447,177 @@ public class V1JobsServlet extends BaseJobsServlet { return json; } + + /** + * service implementation to bulk kill jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject killJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + return bulkModifyJobs(request, response); + } + + /** + * service implementation to bulk suspend jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject suspendJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + return bulkModifyJobs(request, response); + } + + /** + * service implementation to bulk resume jobs + * @param request + * @param response + * @return + * @throws XServletException + * @throws IOException + */ + @Override + protected JSONObject resumeJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + return bulkModifyJobs(request, response); + } + + private JSONObject bulkModifyJobs(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + String action = request.getParameter(RestConstants.ACTION_PARAM); + String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); + String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); + String startStr = request.getParameter(RestConstants.OFFSET_PARAM); + String lenStr = request.getParameter(RestConstants.LEN_PARAM); + String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null + ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM); + + int start = (startStr != null) ? Integer.parseInt(startStr) : 1; + start = (start < 1) ? 1 : start; + int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50; + len = (len < 1) ? 50 : len; + + JSONObject json = new JSONObject(); + List<String> ids = new ArrayList<String>(); + + if (jobType.equals("wf")) { + WorkflowsInfo jobs = null; + DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); + if (action.equals(RestConstants.JOB_ACTION_KILL)) { + try { + jobs = dagEngine.killJobs(filter, start, len); + } + catch (DagEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { + try { + jobs = dagEngine.suspendJobs(filter, start, len); + } + catch (DagEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { + try { + jobs = dagEngine.resumeJobs(filter, start, len); + } + catch (DagEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + + json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId)); + json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); + json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); + json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); + + } + else if (jobType.equals("bundle")) { + BundleJobInfo jobs = null; + BundleEngine bundleEngine = Services.get().get(BundleEngineService.class). + getBundleEngine(getUser(request)); + if (action.equals(RestConstants.JOB_ACTION_KILL)) { + try { + jobs = bundleEngine.killJobs(filter, start, len); + } + catch (BundleEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { + try { + jobs = bundleEngine.suspendJobs(filter, start, len); + } + catch (BundleEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { + try { + jobs = bundleEngine.resumeJobs(filter, start, len); + } + catch (BundleEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + + json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId)); + json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); + json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); + json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); + } + else { + CoordinatorJobInfo jobs = null; + CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class). + getCoordinatorEngine(getUser(request)); + if (action.equals(RestConstants.JOB_ACTION_KILL)) { + try { + jobs = coordEngine.killJobs(filter, start, len); + } + catch (CoordinatorEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) { + try { + jobs = coordEngine.suspendJobs(filter, start, len); + } + catch (CoordinatorEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + else if (action.equals(RestConstants.JOB_ACTION_RESUME)) { + try { + jobs = coordEngine.resumeJobs(filter, start, len); + } + catch (CoordinatorEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + } + + json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId)); + json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); + json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); + json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); + } + + json.put(JsonTags.JOB_IDS, toJSONArray(ids)); + return json; + } + + private static JSONArray toJSONArray(List<String> ids) { + JSONArray array = new JSONArray(); + for (String id : ids) { + array.add(id); + } + return array; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java b/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java new file mode 100644 index 0000000..98ac7b3 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.Job; +import org.apache.oozie.servlet.XServletException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +public class JobsFilterUtils { + + private static final Set<String> FILTER_NAMES = new HashSet<String>(); + + static { + FILTER_NAMES.add(OozieClient.FILTER_USER); + FILTER_NAMES.add(OozieClient.FILTER_NAME); + FILTER_NAMES.add(OozieClient.FILTER_GROUP); + FILTER_NAMES.add(OozieClient.FILTER_STATUS); + FILTER_NAMES.add(OozieClient.FILTER_ID); + } + + public static Map<String, List<String>> parseFilter(String filter) throws ServletException{ + Map<String, List<String>> map = new HashMap<String, List<String>>(); + if (filter != null) { + StringTokenizer st = new StringTokenizer(filter, ";"); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + if (token.contains("=")) { + String[] pair = token.split("="); + if (pair.length != 2) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420, + "filter elements must be name=value pairs"); + } + if (!FILTER_NAMES.contains(pair[0])) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420, + "filter name: " + pair[0] + " is invalid"); + } + + if (pair[0].equals("status")) { + try { + Job.Status.valueOf(pair[1]); + } + catch (IllegalArgumentException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420, + XLog.format("invalid status [{0}]", pair[1])); + } + } + List<String> list = map.get(pair[0]); + if (list == null) { + list = new ArrayList<String>(); + map.put(pair[0], list); + } + list.add(pair[1]); + } + else { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420, + "filter elements must be name=value pairs"); + } + } + } + return map; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java index f81c1ad..e55d5f6 100644 --- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java +++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java @@ -445,6 +445,54 @@ public class TestOozieCLI extends DagServletTestCase { }); } + public void testBulkSuspendResumeKill1() throws Exception { + runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() { + @Override + public Void call() throws Exception { + String oozieUrl = getContextURL(); + String[] args = new String[]{"jobs", "-oozie", oozieUrl, "-suspend", "-filter", + "name=workflow-1"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockDagEngineService.did); + + args = new String[]{"jobs", "-oozie", oozieUrl, "-resume", "-filter", + "name=workflow-1"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockDagEngineService.did); + + args = new String[]{"jobs", "-oozie", oozieUrl, "-kill", "-filter", + "name=workflow-1"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockDagEngineService.did); + return null; + } + }); + } + + public void testBulkSuspendResumeKill2() throws Exception { + runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() { + @Override + public Void call() throws Exception { + String oozieUrl = getContextURL(); + String[] args = new String[]{"jobs", "-oozie", oozieUrl, "-suspend", "-filter", + "name=coordinator", "-jobtype", "coordinator"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockCoordinatorEngineService.did); + + args = new String[]{"jobs", "-oozie", oozieUrl, "-resume", "-filter", + "name=coordinator", "-jobtype", "coordinator"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockCoordinatorEngineService.did); + + args = new String[]{"jobs", "-oozie", oozieUrl, "-kill", "-filter", + "name=coordinator", "-jobtype", "coordinator"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOBS, MockCoordinatorEngineService.did); + return null; + } + }); + } + /** * Test the working of coord action kill from Client with action numbers *
