Repository: oozie Updated Branches: refs/heads/master e00411a6f -> 089470223
OOZIE-1567 Provide a wait tool in Oozie (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/08947022 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/08947022 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/08947022 Branch: refs/heads/master Commit: 089470223f0c094740ef5c880c4d244c39c02da4 Parents: e00411a Author: Robert Kanter <[email protected]> Authored: Tue Sep 23 08:59:01 2014 -0700 Committer: Robert Kanter <[email protected]> Committed: Tue Sep 23 08:59:01 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 32 +++++- .../org/apache/oozie/client/OozieClient.java | 104 +++++++++++++++++++ .../org/apache/oozie/client/rest/JsonTags.java | 1 + .../apache/oozie/client/rest/RestConstants.java | 2 + .../main/java/org/apache/oozie/BaseEngine.java | 8 ++ .../java/org/apache/oozie/BundleEngine.java | 21 ++++ .../org/apache/oozie/CoordinatorEngine.java | 39 +++++++ .../org/apache/oozie/CoordinatorJobBean.java | 2 + .../main/java/org/apache/oozie/DagEngine.java | 18 ++++ .../executor/jpa/CoordJobQueryExecutor.java | 7 ++ .../apache/oozie/servlet/BaseJobServlet.java | 20 ++++ .../org/apache/oozie/servlet/V0JobServlet.java | 6 ++ .../org/apache/oozie/servlet/V1JobServlet.java | 6 ++ .../org/apache/oozie/servlet/V2JobServlet.java | 31 ++++++ .../org/apache/oozie/client/TestOozieCLI.java | 31 ++++++ .../servlet/MockCoordinatorEngineService.java | 7 ++ .../oozie/servlet/MockDagEngineService.java | 7 ++ .../apache/oozie/servlet/TestV2JobServlet.java | 33 ++++++ docs/src/site/twiki/DG_CommandLineTool.twiki | 90 +++++++++++----- docs/src/site/twiki/WebServicesAPI.twiki | 24 +++++ release-log.txt | 1 + 21 files changed, 460 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index f3ffd1f..4f845e8 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -119,6 +119,9 @@ public class OozieCLI { public static final String SHOWDIFF_OPTION = "diff"; public static final String UPDATE_OPTION = "update"; public static final String IGNORE_OPTION = "ignore"; + public static final String POLL_OPTION = "poll"; + public static final String TIMEOUT_OPTION = "timeout"; + public static final String INTERVAL_OPTION = "interval"; public static final String DO_AS_OPTION = "doas"; @@ -286,10 +289,11 @@ public class OozieCLI { Option suspend = new Option(SUSPEND_OPTION, true, "suspend a job"); Option resume = new Option(RESUME_OPTION, true, "resume a job"); Option kill = new Option(KILL_OPTION, true, "kill a job (coordinator can mention -action or -date)"); - Option change = new Option(CHANGE_OPTION, true, "change a coordinator job"); + Option change = new Option(CHANGE_OPTION, true, "change a coordinator or bundle job"); Option changeValue = new Option(CHANGE_VALUE_OPTION, true, "new endtime/concurrency/pausetime value for changing a coordinator job"); Option info = new Option(INFO_OPTION, true, "info of a job"); + Option poll = new Option(POLL_OPTION, true, "poll Oozie until a job reaches a terminal state or a timeout occurs"); Option offset = new Option(OFFSET_OPTION, true, "job info offset of actions (default '1', requires -info)"); Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)"); Option filter = new Option(FILTER_OPTION, true, @@ -312,7 +316,8 @@ public class OozieCLI { Option config_content = new Option(CONFIG_CONTENT_OPTION, true, "job configuration"); Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode"); Option action = new Option(ACTION_OPTION, true, - "coordinator rerun on action ids (requires -rerun); coordinator log retrieval on action ids (requires -log)"); + "coordinator rerun/kill on action ids (requires -rerun/-kill); coordinator log retrieval on action ids" + + "(requires -log)"); Option date = new Option(DATE_OPTION, true, "coordinator/bundle rerun on action dates (requires -rerun); coordinator log retrieval on action dates (requires -log)"); Option rerun_coord = new Option(RERUN_COORD_OPTION, true, "bundle rerun on coordinator names (requires -rerun)"); @@ -327,6 +332,11 @@ public class OozieCLI { Option ignore = new Option(IGNORE_OPTION, true, "change status of a coordinator job or action to IGNORED" + " (-action required to ignore coord actions)"); + Option timeout = new Option(TIMEOUT_OPTION, true, "timeout in minutes (default is 30, negative values indicate no " + + "timeout, requires -poll)"); + timeout.setType(Integer.class); + Option interval = new Option(INTERVAL_OPTION, true, "polling interval in minutes (default is 5, requires -poll)"); + interval.setType(Integer.class); Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user"); @@ -346,6 +356,7 @@ public class OozieCLI { actions.addOption(definition); actions.addOption(config_content); actions.addOption(ignore); + actions.addOption(poll); actions.setRequired(true); Options jobOptions = new Options(); jobOptions.addOption(oozie); @@ -369,6 +380,8 @@ public class OozieCLI { jobOptions.addOption(getAllWorkflows); jobOptions.addOptionGroup(actions); jobOptions.addOption(logFilter); + jobOptions.addOption(timeout); + jobOptions.addOption(interval); addAuthOptions(jobOptions); jobOptions.addOption(showdiff); @@ -1152,6 +1165,21 @@ public class OozieCLI { System.out.println(wc.updateCoord(coordJobId, conf, dryrun, showdiff)); } } + else if (options.contains(POLL_OPTION)) { + String jobId = commandLine.getOptionValue(POLL_OPTION); + int timeout = 30; + int interval = 5; + String timeoutS = commandLine.getOptionValue(TIMEOUT_OPTION); + if (timeoutS != null) { + timeout = Integer.parseInt(timeoutS); + } + String intervalS = commandLine.getOptionValue(INTERVAL_OPTION); + if (intervalS != null) { + interval = Integer.parseInt(intervalS); + } + boolean verbose = commandLine.hasOption(VERBOSE_OPTION); + wc.pollJob(jobId, timeout, interval, verbose); + } } catch (OozieClientException ex) { throw new OozieCLIException(ex.toString(), ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index d6ff2d0..5bbac4a 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -46,11 +46,13 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Callable; /** @@ -162,6 +164,26 @@ public class OozieClient { NORMAL, NOWEBSERVICE, SAFEMODE } + private static final Set<String> COMPLETED_WF_STATUSES = new HashSet<String>(); + private static final Set<String> COMPLETED_COORD_AND_BUNDLE_STATUSES = new HashSet<String>(); + private static final Set<String> COMPLETED_COORD_ACTION_STATUSES = new HashSet<String>(); + static { + COMPLETED_WF_STATUSES.add(WorkflowJob.Status.FAILED.toString()); + COMPLETED_WF_STATUSES.add(WorkflowJob.Status.KILLED.toString()); + COMPLETED_WF_STATUSES.add(WorkflowJob.Status.SUCCEEDED.toString()); + COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.FAILED.toString()); + COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.KILLED.toString()); + COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.SUCCEEDED.toString()); + COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.DONEWITHERROR.toString()); + COMPLETED_COORD_AND_BUNDLE_STATUSES.add(Job.Status.IGNORED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.FAILED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.IGNORED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.KILLED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SKIPPED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.SUCCEEDED.toString()); + COMPLETED_COORD_ACTION_STATUSES.add(CoordinatorAction.Status.TIMEDOUT.toString()); + } + /** * debugMode =0 means no debugging. > 0 means debugging on. */ @@ -1810,6 +1832,88 @@ public class OozieClient { return new BulkResponseStatus(filter, start, len).call(); } + /** + * Poll a job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID) and return when it has reached a + * terminal state. + * (i.e. FAILED, KILLED, SUCCEEDED) + * + * @param id The Job ID + * @param timeout timeout in minutes (negative values indicate no timeout) + * @param interval polling interval in minutes (must be positive) + * @param verbose if true, the current status will be printed out at each poll; if false, no output + * @throws OozieClientException thrown if the job's status could not be retrieved + */ + public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException { + notEmpty("id", id); + if (interval < 1) { + throw new IllegalArgumentException("interval must be a positive integer"); + } + boolean noTimeout = (timeout < 1); + long endTime = System.currentTimeMillis() + timeout * 60 * 1000; + interval *= 60 * 1000; + + final Set<String> completedStatuses; + if (id.endsWith("-W")) { + completedStatuses = COMPLETED_WF_STATUSES; + } else if (id.endsWith("-C")) { + completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; + } else if (id.endsWith("-B")) { + completedStatuses = COMPLETED_COORD_AND_BUNDLE_STATUSES; + } else if (id.contains("-C@")) { + completedStatuses = COMPLETED_COORD_ACTION_STATUSES; + } else { + throw new IllegalArgumentException("invalid job type"); + } + + String status = getStatus(id); + if (verbose) { + System.out.println(status); + } + while(!completedStatuses.contains(status) && (noTimeout || System.currentTimeMillis() <= endTime)) { + try { + Thread.sleep(interval); + } catch (InterruptedException ie) { + // ignore + } + status = getStatus(id); + if (verbose) { + System.out.println(status); + } + } + } + + /** + * Gets the status for a particular job (Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID). + * + * @param jobId given jobId + * @return the status + * @throws OozieClientException + */ + public String getStatus(String jobId) throws OozieClientException { + return new Status(jobId).call(); + } + + private class Status extends ClientCallable<String> { + + Status(String jobId) { + super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, + RestConstants.JOB_SHOW_STATUS)); + } + + @Override + protected String call(HttpURLConnection conn) throws IOException, OozieClientException { + if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { + Reader reader = new InputStreamReader(conn.getInputStream()); + JSONObject json = (JSONObject) JSONValue.parse(reader); + return (String) json.get(JsonTags.STATUS); + } + else { + handleError(conn); + } + return null; + } + } + private class GetQueueDump extends ClientCallable<List<String>> { GetQueueDump() { super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams()); http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java index 8a86bf1..b7cf0e7 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java @@ -232,4 +232,5 @@ public interface JsonTags { public static final String COORD_UPDATE = RestConstants.JOB_COORD_UPDATE; public static final String COORD_UPDATE_DIFF = "diff"; + public static final String STATUS = "status"; } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index 4b393c8..4cc6606 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -93,6 +93,8 @@ public interface RestConstants { public static final String JOB_SHOW_KILL_PARAM = "show-kill"; + public static final String JOB_SHOW_STATUS = "status"; + public static final String JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM = "coord-scope"; public static final String JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM = "date-scope"; http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/BaseEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java index a8bf023..a982ad4 100644 --- a/core/src/main/java/org/apache/oozie/BaseEngine.java +++ b/core/src/main/java/org/apache/oozie/BaseEngine.java @@ -218,4 +218,12 @@ public abstract class BaseEngine { } } + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws BaseEngineException thrown if the job's status could not be obtained + */ + public abstract String getJobStatus(String jobId) throws BaseEngineException; } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index c657f16..27afe73 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -49,6 +49,8 @@ import org.apache.oozie.command.bundle.BundleKillXCommand; import org.apache.oozie.command.bundle.BundleRerunXCommand; import org.apache.oozie.command.bundle.BundleStartXCommand; import org.apache.oozie.command.bundle.BundleSubmitXCommand; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogStreamingService; @@ -470,4 +472,23 @@ public class BundleEngine extends BaseEngine { } return bulkFilter; } + + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws BundleEngineException thrown if the job's status could not be obtained + */ + @Override + public String getJobStatus(String jobId) throws BundleEngineException { + try { + BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get( + BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); + return bundleJob.getStatusStr(); + } + catch (JPAExecutorException e) { + throw new BundleEngineException(e); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index c8bbbf7..8591d63 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -39,6 +39,8 @@ import org.apache.oozie.command.coord.CoordResumeXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.coord.CoordUpdateXCommand; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; @@ -802,4 +804,41 @@ public class CoordinatorEngine extends BaseEngine { throw new CoordinatorEngineException(ex); } } + + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws CoordinatorEngineException thrown if the job's status could not be obtained + */ + @Override + public String getJobStatus(String jobId) throws CoordinatorEngineException { + try { + CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId); + return coordJob.getStatusStr(); + } + catch (JPAExecutorException e) { + throw new CoordinatorEngineException(e); + } + } + + /** + * Return the status for an Action ID + * + * @param actionId action Id. + * @return the action's status + * @throws CoordinatorEngineException thrown if the action's status could not be obtained + */ + public String getActionStatus(String actionId) throws CoordinatorEngineException { + try { + CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get( + CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); + return coordAction.getStatusStr(); + } + catch (JPAExecutorException e) { + throw new CoordinatorEngineException(e); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 71a9ab4..03757dd 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -131,6 +131,8 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user from CoordinatorJobBean w where w.id = :id"), + @NamedQuery(name = "GET_COORD_JOB_STATUS", query = "select w.statusStr from CoordinatorJobBean w where w.id = :id"), + @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id")}) @NamedNativeQueries({ http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index 0ccf9fc..edb4d54 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -544,4 +544,22 @@ public class DagEngine extends BaseEngine { throw new DagEngineException(ex); } } + + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws DagEngineException thrown if the job's status could not be obtained + */ + @Override + public String getJobStatus(String jobId) throws DagEngineException { + try { + WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); + return wfJob.getStatusStr(); + } + catch (JPAExecutorException ex) { + throw new DagEngineException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 2c9e00e..169823b 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -60,6 +60,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_ACTION_KILL, GET_COORD_JOB_MATERIALIZE, GET_COORD_JOB_SUSPEND_KILL, + GET_COORD_JOB_STATUS, GET_COORD_JOB_STATUS_PARENTID, GET_COORD_JOBS_CHANGED, GET_COORD_JOBS_OLDER_FOR_MATERILZATION, @@ -194,6 +195,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOB_ACTION_KILL: case GET_COORD_JOB_MATERIALIZE: case GET_COORD_JOB_SUSPEND_KILL: + case GET_COORD_JOB_STATUS: case GET_COORD_JOB_STATUS_PARENTID: query.setParameter("id", parameters[0]); break; @@ -313,6 +315,11 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean.setAppNamespace((String) arr[6]); bean.setDoneMaterialization((Integer) arr[7]); break; + case GET_COORD_JOB_STATUS: + bean = new CoordinatorJobBean(); + bean.setId((String) parameters[0]); + bean.setStatusStr((String) ret); + break; case GET_COORD_JOB_STATUS_PARENTID: bean = new CoordinatorJobBean(); arr = (Object[]) ret; http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java index 11835ed..c94d1e2 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java @@ -246,6 +246,7 @@ public abstract class BaseJobServlet extends JsonRestServlet { * Return information about jobs. */ @Override + @SuppressWarnings("unchecked") public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String jobId = getResourceName(request); String show = request.getParameter(RestConstants.JOB_SHOW_PARAM); @@ -306,6 +307,13 @@ public abstract class BaseJobServlet extends JsonRestServlet { stopCron(); streamJobGraph(request, response); startCron(); // -- should happen before you stream anything in response? + } else if (show.equals(RestConstants.JOB_SHOW_STATUS)) { + stopCron(); + String status = getJobStatus(request, response); + JSONObject json = new JSONObject(); + json.put(JsonTags.STATUS, status); + startCron(); + sendJsonResponse(response, HttpServletResponse.SC_OK, json); } else { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, @@ -462,5 +470,17 @@ public abstract class BaseJobServlet extends JsonRestServlet { */ abstract JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) throws XServletException, IOException; + + /** + * Abstract method to get status for a job + * + * @param request the request + * @param response the response + * @return the JSON object + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException; } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java index eb699e6..b160b46 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java @@ -226,4 +226,10 @@ public class V0JobServlet extends BaseJobServlet { protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); } + + @Override + protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java index 396661a..8dc9608 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java @@ -1089,4 +1089,10 @@ public class V1JobServlet extends BaseJobServlet { throws XServletException, IOException { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); } + + @Override + protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java index de4f865..da81b49 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java @@ -27,6 +27,8 @@ import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.BaseEngineException; +import org.apache.oozie.BundleEngine; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorActionInfo; import org.apache.oozie.CoordinatorEngine; @@ -39,6 +41,7 @@ import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonTags; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.DagEngineService; import org.apache.oozie.service.Services; @@ -187,4 +190,32 @@ public class V2JobServlet extends V1JobServlet { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } } + + @Override + @SuppressWarnings("unchecked") + protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + String status; + String jobId = getResourceName(request); + try { + if (jobId.endsWith("-B")) { + BundleEngine engine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); + status = engine.getJobStatus(jobId); + } else if (jobId.endsWith("-W")) { + DagEngine engine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); + status = engine.getJobStatus(jobId); + } else { + CoordinatorEngine engine = + Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); + if (jobId.contains("-C@")) { + status = engine.getActionStatus(jobId); + } else { + status = engine.getJobStatus(jobId); + } + } + } catch (BaseEngineException ex) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); + } + return status; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java index 0268100..74e8bca 100644 --- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java +++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java @@ -936,6 +936,37 @@ public class TestOozieCLI extends DagServletTestCase { }); } + public void testJobPoll() throws Exception { + runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() { + @Override + public Void call() throws Exception { + String oozieUrl = getContextURL(); + MockDagEngineService.reset(); + String[] args = new String[]{"job", "-oozie", oozieUrl, "-poll", MockDagEngineService.JOB_ID + "1" + + MockDagEngineService.JOB_ID_END}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockDagEngineService.did); + + args = new String[]{"job", "-oozie", oozieUrl, "-poll", MockDagEngineService.JOB_ID + "1" + + MockDagEngineService.JOB_ID_END, "-interval", "10"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockDagEngineService.did); + + args = new String[]{"job", "-oozie", oozieUrl, "-poll", MockDagEngineService.JOB_ID + "1" + + MockDagEngineService.JOB_ID_END, "-timeout", "60"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockDagEngineService.did); + + args = new String[]{"job", "-oozie", oozieUrl, "-poll", MockDagEngineService.JOB_ID + "1" + + MockDagEngineService.JOB_ID_END, "-interval", "10", "-timeout", "60"}; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockDagEngineService.did); + + return null; + } + }); + } + public void testJobLog() throws Exception { runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() { @Override http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java index 373125b..07c1f19 100644 --- a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java +++ b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java @@ -209,6 +209,13 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService { } @Override + public String getJobStatus(String jobId) throws CoordinatorEngineException { + did = RestConstants.JOB_SHOW_STATUS; + int idx = validateCoordinatorIdx(jobId); + return coordJobs.get(idx).getStatus().toString(); + } + + @Override public String getDefinition(String jobId) throws BaseEngineException { did = RestConstants.JOB_SHOW_DEFINITION; validateCoordinatorIdx(jobId); http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java b/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java index a51c93d..89cfcea 100644 --- a/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java +++ b/core/src/test/java/org/apache/oozie/servlet/MockDagEngineService.java @@ -183,6 +183,13 @@ public class MockDagEngineService extends DagEngineService { } @Override + public String getJobStatus(String jobId) throws DagEngineException { + did = RestConstants.JOB_SHOW_STATUS; + int idx = validateWorkflowIdx(jobId); + return workflows.get(idx).getStatus().toString(); + } + + @Override public String getDefinition(String jobId) throws DagEngineException { did = RestConstants.JOB_SHOW_DEFINITION; int idx = validateWorkflowIdx(jobId); http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java b/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java index db9c594..fb203a6 100644 --- a/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java +++ b/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java @@ -223,4 +223,37 @@ public class TestV2JobServlet extends DagServletTestCase { } }); } + + public void testJobStatus() throws Exception { + runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() { + @Override + public Void call() throws Exception { + MockDagEngineService.reset(); + Map<String, String> params = new HashMap<String, String>(); + params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_STATUS); + URL url = createURL(MockDagEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END, params); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode()); + assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE)); + JSONObject obj = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); + assertEquals("SUCCEEDED", obj.get(JsonTags.STATUS)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockDagEngineService.did); + + MockCoordinatorEngineService.reset(); + params = new HashMap<String, String>(); + params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_STATUS); + url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode()); + assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE)); + obj = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream())); + assertEquals("RUNNING", obj.get(JsonTags.STATUS)); + assertEquals(RestConstants.JOB_SHOW_STATUS, MockCoordinatorEngineService.did); + + return null; + } + }); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/docs/src/site/twiki/DG_CommandLineTool.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki index c65acbd..72c88ad 100644 --- a/docs/src/site/twiki/DG_CommandLineTool.twiki +++ b/docs/src/site/twiki/DG_CommandLineTool.twiki @@ -27,55 +27,67 @@ usage: oozie version : show client version . oozie job <OPTIONS> : job operations - -action <arg> coordinator rerun/kill on action ids (requires -rerun/-kill); coordinator log retrieval on action ids (requires -log) + -action <arg> coordinator rerun/kill on action ids (requires -rerun/-kill); + coordinator log retrieval on action ids (requires -log) -allruns Get workflow jobs corresponding to a coordinator action including all the reruns -auth <arg> select authentication type [SIMPLE|KERBEROS] - -change <arg> change a coordinator/bundle job + -change <arg> change a coordinator or bundle job -config <arg> job configuration file '.xml' or '.properties' + -configcontent <arg> job configuration + -coordinator <arg> bundle rerun on coordinator names (requires -rerun) -D <property=value> set/override value for given property - -date <arg> coordinator/bundle rerun on action dates (requires -rerun) + -date <arg> coordinator/bundle rerun on action dates (requires -rerun); + coordinator log retrieval on action dates (requires -log) + -debug Use debug mode to see debugging statements on stdout -definition <arg> job definition + -diff <arg> Show diff of the new coord definition and properties with the + existing one (default true) -doas <arg> doAs user, impersonates as the specified user - -dryrun Dryrun a workflow (since 3.3.2) or coordinator (since 2.0) job without actually executing it + -dryrun Dryrun a workflow (since 3.3.2) or coordinator (since 2.0) + job without actually executing it + -filter <arg> status=<S1>[;status=<S2>]* or status!=<S1>[;status!=<S2>]* + (All Coordinator actions satisfying the status filters will + be retreived. Positive filters '=' concatenated with OR and + negative filters '!=' with AND. Currently, only supported for + Coordinator job) + -ignore <arg> change status of a coordinator job or action to IGNORED + (-action required to ignore coord actions) -info <arg> info of a job - -kill <arg> kill a job (coordinator requires -action or -date) - -len <arg> number of actions to be returned, used for pagination(default 1000, requires -info) - -filter <arg> All coordinator actions satisfying the filter will be retrieved. - Filter is of the format <key><comparator><value>[;<key><comparator><value>]* - key: status or nominaltime - comparator: =, !=, <, <=, >, >= - value: valid status like SUCCEEDED, KILLED, RUNNING etc. Only = and != apply for status - nominalTime is valid date of the format yyyy-MM-dd'T'HH:mm'Z' (like 2014-06-01T00:00Z) - Filter with '=' is concatenated with 'OR' and other filters are concatenated with 'AND'. - Currently, supported only for coordinator job. - -order <arg> order to show coordinator actions (default ascending order, 'desc' for descending order, requires -info) - Currently, only supported for coordinator job. + -interval <arg> polling interval in minutes (default is 5, requires -poll) + -kill <arg> kill a job (coordinator can mention -action or -date) + -len <arg> number of actions (default TOTAL ACTIONS, requires -info) -localtime use local time (same as passing your time zone to -timezone). Overrides -timezone option -log <arg> job log - -nocleanup do not clean up output-events of the coordinator rerun actions - (requires -rerun) - -offset <arg> offset of actions returned relative to all actions matching the filter criteria, - used for pagination (default '1', requires -info) + -logfilter <arg> job log search parameter. Can be specified as -logfilter + opt1=val1;opt2=val1;opt3=val1. Supported options are recent, + start, end, loglevel, text, limit and debug + -nocleanup do not clean up output-events of the coordiantor rerun + actions (requires -rerun) + -offset <arg> job info offset of actions (default '1', requires -info) -oozie <arg> Oozie URL - -refresh re-materialize the coordinator rerun actions (requires -rerun) - -rerun <arg> rerun a job (coordinator requires -action or -date; bundle requires -coordinator or -date) + -order <arg> order to show coord actions (default ascending order, 'desc' + for descending order, requires -info) + -poll <arg> poll Oozie until a job reaches a terminal state or a timeout + occurs + -refresh re-materialize the coordinator rerun actions (requires + -rerun) + -rerun <arg> rerun a job (coordinator requires -action or -date, bundle + requires -coordinator or -date) -resume <arg> resume a job -run run a job -start <arg> start a job -submit submit a job -suspend <arg> suspend a job + -timeout <arg> timeout in minutes (default is 30, negative values indicate + no timeout, requires -poll) -timezone <arg> use time zone with the specified ID (default GMT). See 'oozie info -timezones' for a list + -update <arg> Update coord definition and properties -value <arg> new endtime/concurrency/pausetime value for changing a - coordinator job; new pausetime value for changing a bundle job + coordinator job -verbose verbose mode - -update Update coordinator definition and properties - -logfilter job log search parameter. Can be specified as -logfilter opt1=val1;opt2=val1;opt3=val1. - Supported options are recent, start, end, loglevel, text, limit and debug. - -ignore <arg> ignore a coordinator job or action - (requires '-action' to ignore a coordinator action, if no option given, ignore a coodinator job) . oozie jobs <OPTIONS> : jobs status -auth <arg> select authentication type [SIMPLE|KERBEROS] @@ -818,6 +830,28 @@ A ignored coordinator action can be rerun using -rerun command. Refer to the [[DG_CoordinatorRerun][Rerunning Coordinator Actions]] for details. When a workflow job of a ignored coordinator action is rerun, the coordinator action becomes =RUNNING= state. +---+++ Polling an Oozie job + +This command allows polling the Oozie server for an Oozie job until it reaches a completed status (e.g. =SUCCEEDED=, =KILLED=, etc). + +Example: + +<verbatim> +$ oozie job -poll <job_id> -interval 10 -timeout 60 -verbose +. +RUNNING +RUNNING +RUNNING +SUCCEEDED +</verbatim> + +The =-poll= argument takes a valid Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID. + +All other arguments are optional: + * =verbose= will cause the job status to be printed at each poll; otherwise, there will be no output + * =interval= allows specifying the polling interval in minutes (default is 5) + * =timeout= allows specifying the timeout in minutes (default is 30 minutes); negative values indicate no timeout + ---++ Jobs Operations ---+++ Checking the Status of multiple Workflow Jobs http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/docs/src/site/twiki/WebServicesAPI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki index 6f31240..fc51934 100644 --- a/docs/src/site/twiki/WebServicesAPI.twiki +++ b/docs/src/site/twiki/WebServicesAPI.twiki @@ -1473,6 +1473,30 @@ The node labels are the node names provided in the workflow XML. This API returns =HTTP 400= when run on a resource other than a workflow, viz. bundle and coordinator. +---++++ Job Status + +An =HTTP GET= request that returns the current status (e.g. =SUCCEEDED=, =KILLED=, etc) of a given job. If you are only interested +in the status, and don't want the rest of the information that the =info= query provides, it is recommended to use this call +as it is more efficient. + +*Request* +<verbatim> +GET /oozie/v2/job/0000000-140908152307821-oozie-rkan-C?show=status +</verbatim> + +*Response* + +<verbatim> +HTTP/1.1 200 OK +Content-Type: application/json;charset=UTF-8 +. +{ + "status" : "SUCCEEDED" +} +</verbatim> + +It accepts any valid Workflow Job ID, Coordinator Job ID, Coordinator Action ID, or Bundle Job ID. + ---++++ Jobs Information A HTTP GET request retrieves workflow and coordinator jobs information. http://git-wip-us.apache.org/repos/asf/oozie/blob/08947022/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index bc182c5..c5ec39a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1567 Provide a wait tool in Oozie (rkanter) OOZIE-2014 TestAuthFilterAuthOozieClient fails after OOZIE-1917 (rkanter) OOZIE-1917 Authentication secret should be random by default and needs to coordinate with HA (rkanter) OOZIE-1853 Improve the Credentials documentation (rkanter)
