OOZIE-2108 bulk kill, suspend, resume jobs using existing filter, offset, len, 
and jobtype params (bzhang)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a9b3c7bb
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a9b3c7bb
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a9b3c7bb

Branch: refs/heads/master
Commit: a9b3c7bb4bb88db71692d5a4f3d698796de30dba
Parents: b4c6006
Author: Bowen Zhang <[email protected]>
Authored: Mon Mar 16 10:53:25 2015 -0700
Committer: Bowen Zhang <[email protected]>
Committed: Mon Mar 16 10:56:16 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/cli/OozieCLI.java     |  72 +++++-
 .../org/apache/oozie/client/OozieClient.java    |  43 +++
 .../org/apache/oozie/client/rest/JsonTags.java  |   2 +
 .../apache/oozie/client/rest/JsonToBean.java    |  14 +
 .../java/org/apache/oozie/BundleEngine.java     | 128 +++++----
 .../org/apache/oozie/CoordinatorEngine.java     |  71 +++++
 .../main/java/org/apache/oozie/DagEngine.java   |  67 +++++
 .../main/java/org/apache/oozie/ErrorCode.java   |   3 +
 .../org/apache/oozie/command/OperationType.java |  25 ++
 .../command/bundle/BulkBundleXCommand.java      | 142 ++++++++++
 .../command/bundle/BundleKillXCommand.java      |   2 +-
 .../oozie/command/coord/BulkCoordXCommand.java  | 143 ++++++++++
 .../oozie/command/wf/BulkWorkflowXCommand.java  | 142 ++++++++++
 .../oozie/service/AuthorizationService.java     | 102 ++++++++
 .../apache/oozie/servlet/BaseJobsServlet.java   | 102 +++++++-
 .../org/apache/oozie/servlet/V0JobsServlet.java |  43 +++
 .../org/apache/oozie/servlet/V1JobsServlet.java | 175 +++++++++++++
 .../org/apache/oozie/util/JobsFilterUtils.java  |  89 +++++++
 .../org/apache/oozie/client/TestOozieCLI.java   |  48 ++++
 .../command/bundle/TestBulkBundleXCommand.java  | 209 +++++++++++++++
 .../command/coord/TestBulkCoordXCommand.java    | 259 +++++++++++++++++++
 .../command/wf/TestBulkWorkflowXCommand.java    | 204 +++++++++++++++
 .../servlet/MockCoordinatorEngineService.java   |  22 ++
 .../oozie/servlet/MockDagEngineService.java     |  18 ++
 docs/src/site/twiki/DG_CommandLineTool.twiki    |  58 +++++
 docs/src/site/twiki/WebServicesAPI.twiki        | 120 +++++++++
 release-log.txt                                 |   1 +
 27 files changed, 2247 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java 
b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index 218edf2..5feb360 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -37,7 +37,11 @@ import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.XOozieClient;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.JsonToBean;
 import org.apache.oozie.client.rest.RestConstants;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -438,6 +442,9 @@ public class OozieCLI {
                         "startcreatedtime, endcreatedtime: time of format 
yyyy-MM-dd'T'HH:mm'Z')");
         Option localtime = new Option(LOCAL_TIME_OPTION, false, "use local 
time (same as passing your time zone to -" +
                 TIME_ZONE_OPTION + "). Overrides -" + TIME_ZONE_OPTION + " 
option");
+        Option kill = new Option(KILL_OPTION, false, "bulk kill operation");
+        Option suspend = new Option(SUSPEND_OPTION, false, "bulk suspend 
operation");
+        Option resume = new Option(RESUME_OPTION, false, "bulk resume 
operation");
         Option timezone = new Option(TIME_ZONE_OPTION, true,
                 "use time zone with the specified ID (default GMT).\nSee 
'oozie info -timezones' for a list");
         Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
@@ -452,6 +459,9 @@ public class OozieCLI {
         jobsOptions.addOption(oozie);
         jobsOptions.addOption(doAs);
         jobsOptions.addOption(localtime);
+        jobsOptions.addOption(kill);
+        jobsOptions.addOption(suspend);
+        jobsOptions.addOption(resume);
         jobsOptions.addOption(timezone);
         jobsOptions.addOption(start);
         jobsOptions.addOption(len);
@@ -1507,6 +1517,11 @@ public class OozieCLI {
     private void jobsCommand(CommandLine commandLine) throws IOException, 
OozieCLIException {
         XOozieClient wc = createXOozieClient(commandLine);
 
+        List<String> options = new ArrayList<String>();
+        for (Option option : commandLine.getOptions()) {
+            options.add(option.getOpt());
+        }
+
         String filter = commandLine.getOptionValue(FILTER_OPTION);
         String s = commandLine.getOptionValue(OFFSET_OPTION);
         int start = Integer.parseInt((s != null) ? s : "0");
@@ -1518,7 +1533,16 @@ public class OozieCLI {
         String bulkFilterString = commandLine.getOptionValue(BULK_OPTION);
 
         try {
-            if (bulkFilterString != null) {
+            if (options.contains(KILL_OPTION)) {
+                printBulkModifiedJobs(wc.killJobs(filter, jobtype, start, 
len), timeZoneId, "killed");
+            }
+            else if (options.contains(SUSPEND_OPTION)) {
+                printBulkModifiedJobs(wc.suspendJobs(filter, jobtype, start, 
len), timeZoneId, "suspended");
+            }
+            else if (options.contains(RESUME_OPTION)) {
+                printBulkModifiedJobs(wc.resumeJobs(filter, jobtype, start, 
len), timeZoneId, "resumed");
+            }
+            else if (bulkFilterString != null) {
                 printBulkJobs(wc.getBulkInfo(bulkFilterString, start, len), 
timeZoneId, commandLine.hasOption(VERBOSE_OPTION));
             }
             else if (jobtype.toLowerCase().contains("wf")) {
@@ -1538,6 +1562,52 @@ public class OozieCLI {
     }
 
     @VisibleForTesting
+    void printBulkModifiedJobs(JSONObject json, String timeZoneId, String 
action) throws IOException {
+        if (json.containsKey(JsonTags.WORKFLOWS_JOBS)) {
+            JSONArray workflows = (JSONArray) 
json.get(JsonTags.WORKFLOWS_JOBS);
+            if (workflows == null) {
+                workflows = new JSONArray();
+            }
+            List<WorkflowJob> wfs = 
JsonToBean.createWorkflowJobList(workflows);
+            if (wfs.isEmpty()) {
+                System.out.println("bulk modify command did not modify any 
jobs");
+            }
+            else {
+                System.out.println("the following jobs have been " + action);
+                printJobs(wfs, timeZoneId, false);
+            }
+        }
+        else if (json.containsKey(JsonTags.COORDINATOR_JOBS)) {
+            JSONArray coordinators = (JSONArray) 
json.get(JsonTags.COORDINATOR_JOBS);
+            if (coordinators == null) {
+                coordinators = new JSONArray();
+            }
+            List<CoordinatorJob> coords = 
JsonToBean.createCoordinatorJobList(coordinators);
+            if (coords.isEmpty()) {
+                System.out.println("bulk modify command did not modify any 
jobs");
+            }
+            else {
+                System.out.println("the following jobs have been " + action);
+                printCoordJobs(coords, timeZoneId, false);
+            }
+        }
+        else {
+            JSONArray bundles = (JSONArray) json.get(JsonTags.BUNDLE_JOBS);
+            if (bundles == null) {
+                bundles = new JSONArray();
+            }
+            List<BundleJob> bundleJobs = 
JsonToBean.createBundleJobList(bundles);
+            if (bundleJobs.isEmpty()) {
+                System.out.println("bulk modify command did not modify any 
jobs");
+            }
+            else {
+                System.out.println("the following jobs have been " + action);
+                printBundleJobs(bundleJobs, timeZoneId, false);
+            }
+        }
+    }
+
+    @VisibleForTesting
     void printCoordJobs(List<CoordinatorJob> jobs, String timeZoneId, boolean 
verbose) throws IOException {
         if (jobs != null && jobs.size() > 0) {
             if (verbose) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java 
b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index 5de25cc..416d066 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -699,6 +699,30 @@ public class OozieClient {
         }
     }
 
+    private class JobsAction extends ClientCallable<JSONObject> {
+
+        JobsAction(String action, String filter, String jobType, int start, 
int len) {
+            super("PUT", RestConstants.JOBS, "",
+                    prepareParams(RestConstants.ACTION_PARAM, action,
+                            RestConstants.JOB_FILTER_PARAM, filter, 
RestConstants.JOBTYPE_PARAM, jobType,
+                            RestConstants.OFFSET_PARAM, 
Integer.toString(start),
+                            RestConstants.LEN_PARAM, Integer.toString(len)));
+        }
+
+        @Override
+        protected JSONObject call(HttpURLConnection conn) throws IOException, 
OozieClientException {
+            conn.setRequestProperty("content-type", 
RestConstants.XML_CONTENT_TYPE);
+            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+                Reader reader = new InputStreamReader(conn.getInputStream());
+                JSONObject json = (JSONObject) JSONValue.parse(reader);
+                return json;
+            }
+            else {
+                handleError(conn);
+            }
+            return null;
+        }
+    }
     /**
      * Update coord definition.
      *
@@ -849,6 +873,25 @@ public class OozieClient {
         return new CoordActionsKill(jobId, rangeType, scope).call();
     }
 
+    public JSONObject bulkModifyJobs(String actionType, String filter, String 
jobType, int start, int len)
+            throws OozieClientException {
+        return new JobsAction(actionType, filter, jobType, start, len).call();
+    }
+
+    public JSONObject killJobs(String filter, String jobType, int start, int 
len)
+            throws OozieClientException {
+        return bulkModifyJobs("kill", filter, jobType, start, len);
+    }
+
+    public JSONObject suspendJobs(String filter, String jobType, int start, 
int len)
+            throws OozieClientException {
+        return bulkModifyJobs("suspend", filter, jobType, start, len);
+    }
+
+    public JSONObject resumeJobs(String filter, String jobType, int start, int 
len)
+            throws OozieClientException {
+        return bulkModifyJobs("resume", filter, jobType, start, len);
+    }
     /**
      * Change a coordinator job.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java 
b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
index 1022dd7..3c2409f 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
@@ -40,6 +40,8 @@ public interface JsonTags {
 
     public static final String JOB_ID = "id";
 
+    public static final String JOB_IDS = "ids";
+
     public static final String WORKFLOW_APP_PATH = "appPath";
     public static final String WORKFLOW_APP_NAME = "appName";
     public static final String WORKFLOW_ID = "id";

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java 
b/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
index 90dbda9..4e01a3e 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
@@ -465,4 +465,18 @@ public class JsonToBean {
         return list;
     }
 
+    /**
+     * Creates a list of bulk write job ids from a JSON array.
+     *
+     * @param json json array.
+     * @return a list of jobs ids from a JSON array.
+     */
+    public static List<String> createBulkWriteJobIdList(JSONArray json) {
+        List<String> list = new ArrayList<String>();
+        for (Object obj : json) {
+            list.add(obj.toString());
+        }
+        return list;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java 
b/core/src/main/java/org/apache/oozie/BundleEngine.java
index 659c8e6..e191184 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -42,6 +42,7 @@ import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand;
 import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand;
 import org.apache.oozie.command.bundle.BundleSLAChangeXCommand;
+import org.apache.oozie.command.bundle.BulkBundleXCommand;
 import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
 import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
@@ -51,12 +52,14 @@ import org.apache.oozie.command.bundle.BundleKillXCommand;
 import org.apache.oozie.command.bundle.BundleRerunXCommand;
 import org.apache.oozie.command.bundle.BundleStartXCommand;
 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
+import org.apache.oozie.command.OperationType;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.JobsFilterUtils;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.XLogUserFilterParam;
@@ -64,6 +67,9 @@ import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
 
 import com.google.common.annotations.VisibleForTesting;
+import sun.reflect.generics.tree.ReturnType;
+
+import javax.servlet.ServletException;
 
 public class BundleEngine extends BaseEngine {
     /**
@@ -318,16 +324,6 @@ public class BundleEngine extends BaseEngine {
         }
     }
 
-    private static final Set<String> FILTER_NAMES = new HashSet<String>();
-
-    static {
-        FILTER_NAMES.add(OozieClient.FILTER_USER);
-        FILTER_NAMES.add(OozieClient.FILTER_NAME);
-        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
-        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
-        FILTER_NAMES.add(OozieClient.FILTER_ID);
-    }
-
     /**
      * Get bundle jobs
      *
@@ -357,48 +353,18 @@ public class BundleEngine extends BaseEngine {
      */
     @VisibleForTesting
     Map<String, List<String>> parseFilter(String filter) throws 
BundleEngineException {
-        Map<String, List<String>> map = new HashMap<String, List<String>>();
-        if (filter != null) {
-            StringTokenizer st = new StringTokenizer(filter, ";");
-            while (st.hasMoreTokens()) {
-                String token = st.nextToken();
-                if (token.contains("=")) {
-                    String[] pair = token.split("=");
-                    if (pair.length != 2) {
-                        throw new BundleEngineException(ErrorCode.E0420, 
filter, "elements must be name=value pairs");
-                    }
-                    if (!FILTER_NAMES.contains(pair[0])) {
-                        throw new BundleEngineException(ErrorCode.E0420, 
filter, XLog.format("invalid name [{0}]",
-                                pair[0]));
-                    }
-                    if (pair[0].equals("status")) {
-                        try {
-                            Job.Status.valueOf(pair[1]);
-                        }
-                        catch (IllegalArgumentException ex) {
-                            throw new BundleEngineException(ErrorCode.E0420, 
filter, XLog.format(
-                                    "invalid status [{0}]", pair[1]));
-                        }
-                    }
-                    List<String> list = map.get(pair[0]);
-                    if (list == null) {
-                        list = new ArrayList<String>();
-                        map.put(pair[0], list);
-                    }
-                    list.add(pair[1]);
-                }
-                else {
-                    throw new BundleEngineException(ErrorCode.E0420, filter, 
"elements must be name=value pairs");
-                }
-            }
+        try {
+            return JobsFilterUtils.parseFilter(filter);
+        }
+        catch (ServletException ex) {
+            throw new BundleEngineException(ErrorCode.E0420, filter, 
ex.getMessage());
         }
-        return map;
     }
 
     /**
      * Get bulk job response
      *
-     * @param filter the filter string
+     * @param bulkFilter the filter string
      * @param start start location for paging
      * @param len total length to get
      * @return bulk job info
@@ -418,7 +384,7 @@ public class BundleEngine extends BaseEngine {
      * Parse filter string to a map with key = filter name and values = filter 
values
      * Allowed keys are defined as constants on top
      *
-     * @param filter the filter string
+     * @param bulkParams the filter string
      * @return filter key-value pair map
      * @throws BundleEngineException thrown if failed to parse filter string
      */
@@ -546,4 +512,72 @@ public class BundleEngine extends BaseEngine {
         }
     }
 
+    /**
+     * return a list of killed Bundle job
+     *
+     * @param filter, the filter string for which the bundle jobs are killed
+     * @param start, the starting index for bundle jobs
+     * @param len, maximum number of jobs to be killed
+     * @return the list of jobs being killed
+     * @throws BundleEngineException thrown if one or more of the jobs cannot 
be killed
+     */
+    public BundleJobInfo killJobs(String filter, int start, int len) throws 
BundleEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, 
start, len, OperationType.Kill).call();
+            if (bundleJobInfo == null) {
+                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 
0);
+            }
+            return bundleJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new BundleEngineException(ex);
+        }
+    }
+
+    /**
+     * return a list of suspended Bundle job
+     *
+     * @param filter, the filter string for which the bundle jobs are suspended
+     * @param start, the starting index for bundle jobs
+     * @param len, maximum number of jobs to be suspended
+     * @return the list of jobs being suspended
+     * @throws BundleEngineException thrown if one or more of the jobs cannot 
be suspended
+     */
+    public BundleJobInfo suspendJobs(String filter, int start, int len) throws 
BundleEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, 
start, len, OperationType.Suspend).call();
+            if (bundleJobInfo == null) {
+                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 
0);
+            }
+            return bundleJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new BundleEngineException(ex);
+        }
+    }
+
+    /**
+     * return a list of resumed Bundle job
+     *
+     * @param filter, the filter string for which the bundle jobs are resumed
+     * @param start, the starting index for bundle jobs
+     * @param len, maximum number of jobs to be resumed
+     * @return the list of jobs being resumed
+     * @throws BundleEngineException thrown if one or more of the jobs cannot 
be resumed
+     */
+    public BundleJobInfo resumeJobs(String filter, int start, int len) throws 
BundleEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, 
start, len, OperationType.Resume).call();
+            if (bundleJobInfo == null) {
+                return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 
0);
+            }
+            return bundleJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new BundleEngineException(ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java 
b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 642a82a..1b21d87 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -28,6 +28,7 @@ import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.BulkCoordXCommand;
 import org.apache.oozie.command.coord.CoordActionInfoXCommand;
 import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand;
 import org.apache.oozie.command.coord.CoordActionsKillXCommand;
@@ -43,6 +44,7 @@ import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
 import org.apache.oozie.command.coord.CoordSubmitXCommand;
 import org.apache.oozie.command.coord.CoordSuspendXCommand;
 import org.apache.oozie.command.coord.CoordUpdateXCommand;
+import org.apache.oozie.command.OperationType;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -894,4 +896,73 @@ public class CoordinatorEngine extends BaseEngine {
         }
     }
 
+    /**
+     * return a list of killed Coordinator job
+     *
+     * @param filter, the filter string for which the coordinator jobs are 
killed
+     * @param start, the starting index for coordinator jobs
+     * @param length, maximum number of jobs to be killed
+     * @return the list of jobs being killed
+     * @throws CoordinatorEngineException thrown if one or more of the jobs 
cannot be killed
+     */
+    public CoordinatorJobInfo killJobs(String filter, int start, int length) 
throws CoordinatorEngineException {
+        try {
+            Map<String, List<String>> filterMap = parseJobsFilter(filter);
+            CoordinatorJobInfo coordinatorJobInfo =
+                    new BulkCoordXCommand(filterMap, start, length, 
OperationType.Kill).call();
+            if (coordinatorJobInfo == null) {
+                return new CoordinatorJobInfo(new 
ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+            }
+            return coordinatorJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new CoordinatorEngineException(ex);
+        }
+    }
+
+    /**
+     * return the jobs that've been suspended
+     * @param filter Filter for jobs that will be suspended, can be name, 
user, group, status, id or combination of any
+     * @param start Offset for the jobs that will be suspended
+     * @param length maximum number of jobs that will be suspended
+     * @return
+     * @throws CoordinatorEngineException
+     */
+    public CoordinatorJobInfo suspendJobs(String filter, int start, int 
length) throws CoordinatorEngineException {
+        try {
+            Map<String, List<String>> filterMap = parseJobsFilter(filter);
+            CoordinatorJobInfo coordinatorJobInfo =
+                    new BulkCoordXCommand(filterMap, start, length, 
OperationType.Suspend).call();
+            if (coordinatorJobInfo == null) {
+                return new CoordinatorJobInfo(new 
ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+            }
+            return coordinatorJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new CoordinatorEngineException(ex);
+        }
+    }
+
+    /**
+     * return the jobs that've been resumed
+     * @param filter Filter for jobs that will be resumed, can be name, user, 
group, status, id or combination of any
+     * @param start Offset for the jobs that will be resumed
+     * @param length maximum number of jobs that will be resumed
+     * @return
+     * @throws CoordinatorEngineException
+     */
+    public CoordinatorJobInfo resumeJobs(String filter, int start, int length) 
throws CoordinatorEngineException {
+        try {
+            Map<String, List<String>> filterMap = parseJobsFilter(filter);
+            CoordinatorJobInfo coordinatorJobInfo =
+                    new BulkCoordXCommand(filterMap, start, length, 
OperationType.Resume).call();
+            if (coordinatorJobInfo == null) {
+                return new CoordinatorJobInfo(new 
ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+            }
+            return coordinatorJobInfo;
+        }
+        catch (CommandException ex) {
+            throw new CoordinatorEngineException(ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java 
b/core/src/main/java/org/apache/oozie/DagEngine.java
index ac2e7b1..faf2d64 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -42,6 +42,8 @@ import org.apache.oozie.command.wf.SubmitSqoopXCommand;
 import org.apache.oozie.command.wf.SubmitXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
+import org.apache.oozie.command.OperationType;
+import org.apache.oozie.command.wf.BulkWorkflowXCommand;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
@@ -601,4 +603,69 @@ public class DagEngine extends BaseEngine {
         throw new BaseEngineException(new XException(ErrorCode.E0301, "Not 
supported for workflow"));
     }
 
+    /**
+     * return the jobs that've been killed
+     * @param filter Jobs that satisfy the filter will be killed
+     * @param start start index in the database of jobs
+     * @param len maximum number of jobs that will be killed
+     * @return
+     * @throws DagEngineException
+     */
+    public WorkflowsInfo killJobs(String filter, int start, int len) throws 
DagEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, 
start, len, OperationType.Kill).call();
+            if (workflowsInfo == null) {
+                return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 
0, 0);
+            }
+            return workflowsInfo;
+        }
+        catch (CommandException ex) {
+            throw new DagEngineException(ex);
+        }
+    }
+
+    /**
+     * return the jobs that've been suspended
+     * @param filter Filter for jobs that will be suspended, can be name, 
user, group, status, id or combination of any
+     * @param start Offset for the jobs that will be suspended
+     * @param len maximum number of jobs that will be suspended
+     * @return
+     * @throws DagEngineException
+     */
+    public WorkflowsInfo suspendJobs(String filter, int start, int len) throws 
DagEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, 
start, len, OperationType.Suspend).call();
+            if (workflowsInfo == null) {
+                return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 
0, 0);
+            }
+            return workflowsInfo;
+        }
+        catch (CommandException ex) {
+            throw new DagEngineException(ex);
+        }
+    }
+
+    /**
+     * return the jobs that've been resumed
+     * @param filter Filter for jobs that will be resumed, can be name, user, 
group, status, id or combination of any
+     * @param start Offset for the jobs that will be resumed
+     * @param len maximum number of jobs that will be resumed
+     * @return
+     * @throws DagEngineException
+     */
+    public WorkflowsInfo resumeJobs(String filter, int start, int len) throws 
DagEngineException {
+        try {
+            Map<String, List<String>> filterList = parseFilter(filter);
+            WorkflowsInfo workflowsInfo = new BulkWorkflowXCommand(filterList, 
start, len, OperationType.Resume).call();
+            if (workflowsInfo == null) {
+                return new WorkflowsInfo(new ArrayList<WorkflowJobBean>(), 0, 
0, 0);
+            }
+            return workflowsInfo;
+        }
+        catch (CommandException ex) {
+            throw new DagEngineException(ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 2fd2e99..8ee550a 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -219,6 +219,8 @@ public enum ErrorCode {
 
     E1101(XLog.STD, "SLA <{0}> cannot be empty."),
 
+    E1102(XLog.STD, "Invalid operation [{0}] for bulk command"),
+
     E1201(XLog.STD, "State [{0}] is invalid for job [{1}]."),
 
     E1301(XLog.STD, "Could not read the bundle job definition, [{0}]"),
@@ -243,6 +245,7 @@ public enum ErrorCode {
     E1320(XLog.STD, "Bundle Job change error, [{0}]"),
     E1321(XLog.STD, "Error evaluating coord name, [{0}]"),
     E1322(XLog.STD, "Bundle status transit error: [{0}]"),
+    E1323(XLog.STD, "Could not kill bundle job, this job either finished 
successfully or does not exist , [{0}]"),
 
 
     E1400(XLog.STD, "doAs (proxyuser) failure"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/OperationType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/OperationType.java 
b/core/src/main/java/org/apache/oozie/command/OperationType.java
new file mode 100644
index 0000000..ec3d8f5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/OperationType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+
+public enum OperationType {
+    Kill,
+    Suspend,
+    Resume
+};

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java 
b/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java
new file mode 100644
index 0000000..d405ccb
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BulkBundleXCommand.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.bundle;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.BundleJobInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.OperationType;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+import java.util.List;
+import java.util.Map;
+
+public class BulkBundleXCommand extends XCommand<BundleJobInfo> {
+    private Map<String, List<String>> filter;
+    private final int start;
+    private final int len;
+    private BundleJobInfo bundleJobInfo;
+    private OperationType operation;
+
+    /**
+     * The constructor for BulkBundleXCommand
+     *
+     * @param filter the filter string
+     * @param start start location for paging
+     * @param length total length to get
+     * @param operation the type of operation to perform, it can be kill, 
suspend or resume
+     */
+    public BulkBundleXCommand(Map<String, List<String>> filter, int start, int 
length, OperationType operation) {
+        super("bulkbundle" + operation, "bulkbundle" + operation, 1);
+        this.filter = filter;
+        this.start = start;
+        this.len = length;
+        this.operation = operation;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#isLockRequired()
+     */
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#getEntityKey()
+     */
+    @Override
+    public String getEntityKey() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+        loadBundleJobs();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+     */
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#execute()
+     */
+    @Override
+    protected BundleJobInfo execute() throws CommandException {
+        List<BundleJobBean> jobs = this.bundleJobInfo.getBundleJobs();
+        for (BundleJobBean job : jobs) {
+            switch (operation) {
+                case Kill:
+                    if (job.getStatus() != Job.Status.SUCCEEDED
+                            && job.getStatus() != Job.Status.FAILED
+                            && job.getStatus() != Job.Status.DONEWITHERROR
+                            && job.getStatus() != Job.Status.KILLED) {
+                        new BundleKillXCommand(job.getId()).call();
+                    }
+                    break;
+                case Suspend:
+                    if (job.getStatus() != Job.Status.SUCCEEDED
+                            && job.getStatus() != Job.Status.FAILED
+                            && job.getStatus() != Job.Status.KILLED
+                            && job.getStatus() != Job.Status.DONEWITHERROR) {
+                        new BundleJobSuspendXCommand(job.getId()).call();
+                    }
+                    break;
+                case Resume:
+                    if (job.getStatus() == Job.Status.SUSPENDED
+                            || job.getStatus() == Job.Status.SUSPENDEDWITHERROR
+                            || job.getStatus() == Job.Status.PREPSUSPENDED) {
+                        new BundleJobResumeXCommand(job.getId()).call();
+                    }
+                    break;
+                default:
+                    throw new CommandException(ErrorCode.E1102, operation);
+            }
+        }
+        loadBundleJobs();
+        return this.bundleJobInfo;
+    }
+
+    private void loadBundleJobs() throws CommandException {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            if (jpaService != null) {
+                this.bundleJobInfo = jpaService.execute(new 
BundleJobInfoGetJPAExecutor(filter, start, len));
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (Exception ex) {
+            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
index 3f22a18..1516ec3 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
@@ -91,7 +91,7 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
                 || bundleJob.getStatus() == Job.Status.KILLED) {
             LOG.info("Bundle job cannot be killed - job already SUCCEEDED, 
FAILED, KILLED or DONEWITHERROR, job id = "
                     + jobId + ", status = " + bundleJob.getStatus());
-            throw new PreconditionException(ErrorCode.E1020, jobId);
+            throw new PreconditionException(ErrorCode.E1323, jobId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java
new file mode 100644
index 0000000..e0ccd0f
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/BulkCoordXCommand.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.CoordinatorJobInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.OperationType;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+import java.util.List;
+import java.util.Map;
+
+public class BulkCoordXCommand extends XCommand<CoordinatorJobInfo> {
+    private Map<String, List<String>> filter;
+    private final int start;
+    private final int len;
+    private CoordinatorJobInfo coordinatorJobInfo;
+    private OperationType operation;
+
+    /**
+     * The constructor for BulkCoordXCommand
+     *
+     * @param filter the filter string
+     * @param start start location for paging
+     * @param length total length to get
+     */
+    public BulkCoordXCommand(Map<String, List<String>> filter, int start, int 
length, OperationType operation) {
+        super("bulkcoord" + operation, "bulkcoord" + operation, 1);
+        this.filter = filter;
+        this.start = start;
+        this.len = length;
+        this.operation = operation;
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.oozie.command.XCommand#isLockRequired()
+    */
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#getEntityKey()
+     */
+    @Override
+    public String getEntityKey() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.oozie.command.XCommand#loadState()
+    */
+    @Override
+    protected void loadState() throws CommandException {
+        loadJobs();
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+    */
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#execute()
+     */
+    @Override
+    protected CoordinatorJobInfo execute() throws CommandException {
+        List<CoordinatorJobBean> jobs = this.coordinatorJobInfo.getCoordJobs();
+        for (CoordinatorJobBean job : jobs) {
+            switch (operation) {
+                case Kill:
+                    if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED
+                            && job.getStatus() != CoordinatorJob.Status.FAILED
+                            && job.getStatus() != 
CoordinatorJob.Status.DONEWITHERROR
+                            && job.getStatus() != CoordinatorJob.Status.KILLED
+                            && job.getStatus() != 
CoordinatorJob.Status.IGNORED) {
+                        new CoordKillXCommand(job.getId()).call();
+                    }
+                    break;
+                case Suspend:
+                    if (job.getStatus() != CoordinatorJob.Status.SUCCEEDED
+                            && job.getStatus() != CoordinatorJob.Status.FAILED
+                            && job.getStatus() != CoordinatorJob.Status.KILLED
+                            && job.getStatus() != 
CoordinatorJob.Status.IGNORED) {
+                        new CoordSuspendXCommand(job.getId()).call();
+                    }
+                    break;
+                case Resume:
+                    if (job.getStatus() == CoordinatorJob.Status.SUSPENDED ||
+                            job.getStatus() == 
CoordinatorJob.Status.SUSPENDEDWITHERROR ||
+                            job.getStatus() == Job.Status.PREPSUSPENDED) {
+                        new CoordResumeXCommand(job.getId()).call();
+                    }
+                    break;
+                default:
+                    throw new CommandException(ErrorCode.E1102, operation);
+            }
+        }
+        loadJobs();
+        return this.coordinatorJobInfo;
+    }
+
+    private void loadJobs() throws CommandException {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            if (jpaService != null) {
+                this.coordinatorJobInfo = jpaService.execute(new 
CoordJobInfoGetJPAExecutor(filter, start, len));
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (Exception ex) {
+            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java
new file mode 100644
index 0000000..4e0b606
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/wf/BulkWorkflowXCommand.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.wf;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.WorkflowsInfo;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.OperationType;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+import java.util.List;
+import java.util.Map;
+
+public class BulkWorkflowXCommand extends WorkflowXCommand<WorkflowsInfo> {
+    private final Map<String, List<String>> filter;
+    private final int start;
+    private final int len;
+    private WorkflowsInfo workflowsInfo;
+    private OperationType operation;
+
+    /**
+     * constructor taking the filter information.
+     *
+     * @param filter Can be name, status, user, group and combination of these
+     * @param start starting from this index in the list of workflows matching 
the filter are killed
+     * @param length number of workflows to be killed from the list of 
workflows matching the filter and starting from
+     *        index "start".
+     */
+    public BulkWorkflowXCommand(Map<String, List<String>> filter, int start, 
int length, OperationType operation) {
+        super("bulkkill", "bulkkill", 1, true);
+        this.filter = filter;
+        this.start = start;
+        this.len = length;
+        this.operation = operation;
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.oozie.command.XCommand#execute()
+    */
+    @Override
+    protected WorkflowsInfo execute() throws CommandException {
+        try {
+            List<WorkflowJobBean> workflows = 
this.workflowsInfo.getWorkflows();
+            for (WorkflowJobBean job : workflows) {
+                switch (operation) {
+                    case Kill:
+                        if (job.getStatus() == WorkflowJob.Status.PREP
+                                || job.getStatus() == 
WorkflowJob.Status.RUNNING
+                                || job.getStatus() == 
WorkflowJob.Status.SUSPENDED
+                                || job.getStatus() == 
WorkflowJob.Status.FAILED) {
+                            new KillXCommand(job.getId()).call();
+                        }
+                        break;
+                    case Suspend:
+                        if (job.getStatus() == WorkflowJob.Status.RUNNING) {
+                            new SuspendXCommand(job.getId()).call();
+                        }
+                        break;
+                    case Resume:
+                        if (job.getStatus() == WorkflowJob.Status.SUSPENDED) {
+                            new ResumeXCommand(job.getId()).call();
+                        }
+                        break;
+                    default:
+                        throw new CommandException(ErrorCode.E1102, operation);
+                }
+            }
+            loadJobs();
+            return this.workflowsInfo;
+        }
+        catch (Exception ex) {
+            throw new CommandException(ErrorCode.E0725, ex.getMessage(), ex);
+        }
+    }
+
+    /* (non-Javadoc)
+    * @see org.apache.oozie.command.XCommand#getEntityKey()
+    */
+    @Override
+    public String getEntityKey() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#isLockRequired()
+     */
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+        loadJobs();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+     */
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+    private void loadJobs() throws CommandException {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            if (jpaService != null) {
+                this.workflowsInfo = jpaService.execute(
+                        new WorkflowsJobGetJPAExecutor(this.filter, 
this.start, this.len));
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (Exception ex) {
+            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/service/AuthorizationService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/AuthorizationService.java 
b/core/src/main/java/org/apache/oozie/service/AuthorizationService.java
index 9ce0640..cd2372e 100644
--- a/core/src/main/java/org/apache/oozie/service/AuthorizationService.java
+++ b/core/src/main/java/org/apache/oozie/service/AuthorizationService.java
@@ -26,6 +26,9 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +40,9 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInfoGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobInfoGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowsJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
@@ -503,6 +509,102 @@ public class AuthorizationService implements Service {
     }
 
     /**
+     * Check if the user+group is authorized to operate on the specified jobs. 
<p/> Checks if the user is a super-user or
+     * the one who started the jobs. <p/> Read operations are allowed to all 
users.
+     *
+     * @param user user name.
+     * @param filter filter used to select jobs
+     * @param start starting index of the jobs in DB
+     * @param len maximum amount of jbos to select
+     * @param write indicates if the check is for read or write job tasks.
+     * @throws AuthorizationException thrown if the user is not authorized for 
the job.
+     */
+    public void authorizeForJobs(String user, Map<String, List<String>> 
filter, String jobType,
+                                 int start, int len, boolean write) throws 
AuthorizationException {
+        if (authorizationEnabled && write && !isAdmin(user)) {
+            try {
+                // handle workflow jobs
+                if (jobType.equals("wf")) {
+                    List<WorkflowJobBean> jobBeans = new 
ArrayList<WorkflowJobBean>();
+                    JPAService jpaService = 
Services.get().get(JPAService.class);
+                    if (jpaService != null) {
+                        try {
+                            jobBeans = jpaService.execute(new 
WorkflowsJobGetJPAExecutor(
+                                    filter, start, len)).getWorkflows();
+                        }
+                        catch (JPAExecutorException je) {
+                            throw new AuthorizationException(je);
+                        }
+                    }
+                    else {
+                        throw new AuthorizationException(ErrorCode.E0610);
+                    }
+                    for (WorkflowJobBean jobBean : jobBeans) {
+                        if (jobBean != null && 
!jobBean.getUser().equals(user)) {
+                            if (!isUserInAcl(user, jobBean.getGroup())) {
+                                    incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
+                                throw new 
AuthorizationException(ErrorCode.E0508, user, jobBean.getId());
+                            }
+                        }
+                    }
+                }
+                // handle bundle jobs
+                else if (jobType.equals("bundle")) {
+                    List<BundleJobBean> jobBeans = new 
ArrayList<BundleJobBean>();
+                    JPAService jpaService = 
Services.get().get(JPAService.class);
+                    if (jpaService != null) {
+                        try {
+                            jobBeans = jpaService.execute(new 
BundleJobInfoGetJPAExecutor(
+                                    filter, start, len)).getBundleJobs();
+                        }
+                        catch (JPAExecutorException je) {
+                            throw new AuthorizationException(je);
+                        }
+                    }
+                    else {
+                        throw new AuthorizationException(ErrorCode.E0610);
+                    }
+                    for (BundleJobBean jobBean : jobBeans){
+                        if (jobBean != null && 
!jobBean.getUser().equals(user)) {
+                            if (!isUserInAcl(user, jobBean.getGroup())) {
+                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
+                                throw new 
AuthorizationException(ErrorCode.E0509, user, jobBean.getId());
+                            }
+                        }
+                    }
+                }
+                // handle coordinator jobs
+                else {
+                    List<CoordinatorJobBean> jobBeans = new 
ArrayList<CoordinatorJobBean>();
+                    JPAService jpaService = 
Services.get().get(JPAService.class);
+                    if (jpaService != null) {
+                        try {
+                            jobBeans = jpaService.execute(new 
CoordJobInfoGetJPAExecutor(
+                                    filter, start, len)).getCoordJobs();
+                        }
+                        catch (JPAExecutorException je) {
+                            throw new AuthorizationException(je);
+                        }
+                    }
+                    else {
+                        throw new AuthorizationException(ErrorCode.E0610);
+                    }
+                    for (CoordinatorJobBean jobBean : jobBeans) {
+                        if (jobBean != null && 
!jobBean.getUser().equals(user)) {
+                            if (!isUserInAcl(user, jobBean.getGroup())) {
+                                incrCounter(INSTR_FAILED_AUTH_COUNTER, 1);
+                                throw new 
AuthorizationException(ErrorCode.E0509, user, jobBean.getId());
+                            }
+                        }
+                    }
+                }
+            }
+            catch (IOException ex) {
+                throw new AuthorizationException(ErrorCode.E0501, 
ex.getMessage(), ex);
+            }
+        }
+    }
+    /**
      * Convenience method for instrumentation counters.
      *
      * @param name counter name.

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
index 01de277..54b173b 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
@@ -30,8 +30,10 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.service.AuthorizationException;
+import org.apache.oozie.service.AuthorizationService;
 import org.apache.oozie.util.JobUtils;
+import org.apache.oozie.util.JobsFilterUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.json.simple.JSONObject;
 
@@ -41,20 +43,20 @@ public abstract class BaseJobsServlet extends 
JsonRestServlet {
 
     static {
         RESOURCES_INFO[0] = new JsonRestServlet.ResourceInfo("", Arrays.asList(
-                "POST", "GET"), Arrays.asList(
+                "POST", "GET", "PUT"), Arrays.asList(
                 new JsonRestServlet.ParameterInfo(RestConstants.ACTION_PARAM,
-                                                  String.class, false, 
Arrays.asList("POST")),
+                                                  String.class, false, 
Arrays.asList("POST", "PUT")),
                 new JsonRestServlet.ParameterInfo(
                         RestConstants.JOBS_FILTER_PARAM, String.class, false,
-                        Arrays.asList("GET")),
+                        Arrays.asList("GET", "PUT")),
                 new JsonRestServlet.ParameterInfo(RestConstants.JOBTYPE_PARAM,
-                                                  String.class, false, 
Arrays.asList("GET", "POST")),
+                                                  String.class, false, 
Arrays.asList("GET", "POST", "PUT")),
                 new JsonRestServlet.ParameterInfo(RestConstants.OFFSET_PARAM,
-                                                  String.class, false, 
Arrays.asList("GET")),
+                                                  String.class, false, 
Arrays.asList("GET", "PUT")),
                 new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM,
-                                                  String.class, false, 
Arrays.asList("GET")),
+                                                  String.class, false, 
Arrays.asList("GET", "PUT")),
                 new 
JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM,
-                                                  String.class, false, 
Arrays.asList("GET")),
+                                                  String.class, false, 
Arrays.asList("GET", "PUT")),
                 new JsonRestServlet.ParameterInfo(
                         RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class,
                         false, Arrays.asList("GET"))));
@@ -125,6 +127,90 @@ public abstract class BaseJobsServlet extends 
JsonRestServlet {
     }
 
     /**
+     * Perform various job related actions - suspend, resume, kill, etc.
+     */
+    @Override
+    protected void doPut(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException {
+        request.setAttribute(AUDIT_PARAM, 
request.getParameter(RestConstants.JOBS_FILTER_PARAM));
+        request.setAttribute(AUDIT_OPERATION, 
request.getParameter(RestConstants.ACTION_PARAM));
+        try {
+            AuthorizationService auth = 
Services.get().get(AuthorizationService.class);
+            String filter = 
request.getParameter(RestConstants.JOBS_FILTER_PARAM);
+            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
+            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+            String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
+
+            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
+            start = (start < 1) ? 1 : start;
+            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
+            len = (len < 1) ? 50 : len;
+            auth.authorizeForJobs(getUser(request), 
JobsFilterUtils.parseFilter(filter), jobType, start, len, true);
+        }
+        catch (AuthorizationException ex) {
+            throw new XServletException(HttpServletResponse.SC_UNAUTHORIZED, 
ex);
+        }
+
+        String action = request.getParameter(RestConstants.ACTION_PARAM);
+        JSONObject json = null;
+        if (action.equals(RestConstants.JOB_ACTION_KILL)) {
+            stopCron();
+            json = killJobs(request, response);
+            startCron();
+        }
+        else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
+            stopCron();
+            json = resumeJobs(request, response);
+            startCron();
+        }
+        else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
+            stopCron();
+            json = suspendJobs(request, response);
+            startCron();
+        }
+        else {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, 
ErrorCode.E0303,
+                    RestConstants.ACTION_PARAM, action);
+        }
+        response.setStatus(HttpServletResponse.SC_OK);
+        sendJsonResponse(response, HttpServletResponse.SC_OK, json);
+    }
+
+    /**
+     * abstract method to kill jobs based ona filter param. The jobs could be 
workflow, coordinator or bundle jobs
+     *
+     * @param request
+     * @param response
+     * @return JSONObject of all jobs being killed
+     * @throws XServletException
+     * @throws IOException
+     */
+    abstract JSONObject killJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException;
+
+    /**
+     * abstract method to suspend jobs based ona filter param. The jobs could 
be workflow, coordinator or bundle jobs
+     *
+     * @param request
+     * @param response
+     * @return JSONObject of all jobs being suspended
+     * @throws XServletException
+     * @throws IOException
+     */
+    abstract JSONObject suspendJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException;
+
+    /**
+     * abstract method to resume jobs based ona filter param. The jobs could 
be workflow, coordinator or bundle jobs
+     *
+     * @param request
+     * @param response
+     * @return JSONObject of all jobs being resumed
+     * @throws XServletException
+     * @throws IOException
+     */
+    abstract JSONObject resumeJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException;
+    /**
      * abstract method to submit a job, either workflow or coordinator in the 
case of workflow job, there is an optional
      * flag in request to indicate if want this job to be started immediately 
or not
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java
index da30bce..2c79ef0 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java
@@ -121,4 +121,47 @@ public class V0JobsServlet extends BaseJobsServlet {
 
         return json;
     }
+
+
+    /**
+     * service implementation to bulk kill jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject killJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new UnsupportedOperationException("method not implemented in V0 
API");
+    }
+
+    /**
+     * service implementation to bulk suspend jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject suspendJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new UnsupportedOperationException("method not implemented in V0 
API");
+    }
+
+    /**
+     * service implementation to bulk resume jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject resumeJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        throw new UnsupportedOperationException("method not implemented in V0 
API");
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
index 6c20ad7..80c8ec4 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.servlet;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -53,6 +54,7 @@ import org.apache.oozie.service.BundleEngineService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
+import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 public class V1JobsServlet extends BaseJobsServlet {
@@ -445,4 +447,177 @@ public class V1JobsServlet extends BaseJobsServlet {
 
         return json;
     }
+
+    /**
+     * service implementation to bulk kill jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject killJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        return bulkModifyJobs(request, response);
+    }
+
+    /**
+     * service implementation to bulk suspend jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject suspendJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        return bulkModifyJobs(request, response);
+    }
+
+    /**
+     * service implementation to bulk resume jobs
+     * @param request
+     * @param response
+     * @return
+     * @throws XServletException
+     * @throws IOException
+     */
+    @Override
+    protected JSONObject resumeJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        return bulkModifyJobs(request, response);
+    }
+
+    private JSONObject bulkModifyJobs(HttpServletRequest request, 
HttpServletResponse response) throws XServletException,
+            IOException {
+        String action = request.getParameter(RestConstants.ACTION_PARAM);
+        String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);
+        String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM);
+        String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
+        String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+        String timeZoneId = 
request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
+                ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
+
+        int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
+        start = (start < 1) ? 1 : start;
+        int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
+        len = (len < 1) ? 50 : len;
+
+        JSONObject json = new JSONObject();
+        List<String> ids = new ArrayList<String>();
+
+        if (jobType.equals("wf")) {
+            WorkflowsInfo jobs = null;
+            DagEngine dagEngine = 
Services.get().get(DagEngineService.class).getDagEngine(getUser(request));
+            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
+                try {
+                    jobs = dagEngine.killJobs(filter, start, len);
+                }
+                catch (DagEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            } else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
+                try {
+                    jobs = dagEngine.suspendJobs(filter, start, len);
+                }
+                catch (DagEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            } else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
+                try {
+                    jobs = dagEngine.resumeJobs(filter, start, len);
+                }
+                catch (DagEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+
+            json.put(JsonTags.WORKFLOWS_JOBS, 
WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId));
+            json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal());
+            json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart());
+            json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen());
+
+        }
+        else if (jobType.equals("bundle")) {
+            BundleJobInfo jobs = null;
+            BundleEngine bundleEngine = 
Services.get().get(BundleEngineService.class).
+                    getBundleEngine(getUser(request));
+            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
+                try {
+                    jobs = bundleEngine.killJobs(filter, start, len);
+                }
+                catch (BundleEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
+                try {
+                    jobs = bundleEngine.suspendJobs(filter, start, len);
+                }
+                catch (BundleEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
+                try {
+                    jobs = bundleEngine.resumeJobs(filter, start, len);
+                }
+                catch (BundleEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+
+            json.put(JsonTags.BUNDLE_JOBS, 
BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId));
+            json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal());
+            json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart());
+            json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen());
+        }
+        else {
+            CoordinatorJobInfo jobs = null;
+            CoordinatorEngine coordEngine = 
Services.get().get(CoordinatorEngineService.class).
+                    getCoordinatorEngine(getUser(request));
+            if (action.equals(RestConstants.JOB_ACTION_KILL)) {
+                try {
+                    jobs = coordEngine.killJobs(filter, start, len);
+                }
+                catch (CoordinatorEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+            else if (action.equals(RestConstants.JOB_ACTION_SUSPEND)) {
+                try {
+                    jobs = coordEngine.suspendJobs(filter, start, len);
+                }
+                catch (CoordinatorEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+            else if (action.equals(RestConstants.JOB_ACTION_RESUME)) {
+                try {
+                    jobs = coordEngine.resumeJobs(filter, start, len);
+                }
+                catch (CoordinatorEngineException ex) {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+                }
+            }
+
+            json.put(JsonTags.COORDINATOR_JOBS, 
CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId));
+            json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal());
+            json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart());
+            json.put(JsonTags.COORD_JOB_LEN, jobs.getLen());
+        }
+
+        json.put(JsonTags.JOB_IDS, toJSONArray(ids));
+        return json;
+    }
+
+    private static JSONArray toJSONArray(List<String> ids) {
+        JSONArray array = new JSONArray();
+        for (String id : ids) {
+            array.add(id);
+        }
+        return array;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java 
b/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java
new file mode 100644
index 0000000..98ac7b3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/JobsFilterUtils.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.servlet.XServletException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class JobsFilterUtils {
+
+    private static final Set<String> FILTER_NAMES = new HashSet<String>();
+
+    static {
+        FILTER_NAMES.add(OozieClient.FILTER_USER);
+        FILTER_NAMES.add(OozieClient.FILTER_NAME);
+        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
+        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
+        FILTER_NAMES.add(OozieClient.FILTER_ID);
+    }
+
+    public static Map<String, List<String>> parseFilter(String filter) throws 
ServletException{
+        Map<String, List<String>> map = new HashMap<String, List<String>>();
+        if (filter != null) {
+            StringTokenizer st = new StringTokenizer(filter, ";");
+            while (st.hasMoreTokens()) {
+                String token = st.nextToken();
+                if (token.contains("=")) {
+                    String[] pair = token.split("=");
+                    if (pair.length != 2) {
+                        throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420,
+                                "filter elements must be name=value pairs");
+                    }
+                    if (!FILTER_NAMES.contains(pair[0])) {
+                        throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420,
+                                "filter name: " + pair[0] + " is invalid");
+                    }
+
+                    if (pair[0].equals("status")) {
+                        try {
+                            Job.Status.valueOf(pair[1]);
+                        }
+                        catch (IllegalArgumentException ex) {
+                            throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420,
+                                    XLog.format("invalid status [{0}]", 
pair[1]));
+                        }
+                    }
+                    List<String> list = map.get(pair[0]);
+                    if (list == null) {
+                        list = new ArrayList<String>();
+                        map.put(pair[0], list);
+                    }
+                    list.add(pair[1]);
+                }
+                else {
+                    throw new 
XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0420,
+                            "filter elements must be name=value pairs");
+                }
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/a9b3c7bb/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java 
b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
index f81c1ad..e55d5f6 100644
--- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
+++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
@@ -445,6 +445,54 @@ public class TestOozieCLI extends DagServletTestCase {
         });
     }
 
+    public void testBulkSuspendResumeKill1() throws Exception {
+        runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new 
Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                String oozieUrl = getContextURL();
+                String[] args = new String[]{"jobs", "-oozie", oozieUrl, 
"-suspend", "-filter",
+                        "name=workflow-1"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, MockDagEngineService.did);
+
+                args = new String[]{"jobs", "-oozie", oozieUrl, "-resume", 
"-filter",
+                        "name=workflow-1"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, MockDagEngineService.did);
+
+                args = new String[]{"jobs", "-oozie", oozieUrl, "-kill", 
"-filter",
+                        "name=workflow-1"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, MockDagEngineService.did);
+                return null;
+            }
+        });
+    }
+
+    public void testBulkSuspendResumeKill2() throws Exception {
+        runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new 
Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                String oozieUrl = getContextURL();
+                String[] args = new String[]{"jobs", "-oozie", oozieUrl, 
"-suspend", "-filter",
+                        "name=coordinator", "-jobtype", "coordinator"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, 
MockCoordinatorEngineService.did);
+
+                args = new String[]{"jobs", "-oozie", oozieUrl, "-resume", 
"-filter",
+                        "name=coordinator", "-jobtype", "coordinator"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, 
MockCoordinatorEngineService.did);
+
+                args = new String[]{"jobs", "-oozie", oozieUrl, "-kill", 
"-filter",
+                        "name=coordinator", "-jobtype", "coordinator"};
+                assertEquals(0, new OozieCLI().run(args));
+                assertEquals(RestConstants.JOBS, 
MockCoordinatorEngineService.did);
+                return null;
+            }
+        });
+    }
+
     /**
      * Test the working of coord action kill from Client with action numbers
      *

Reply via email to