OOZIE-2751 LocalOozieClient is missing methods from OozieClient (abhishekbafna via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f5554dd3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f5554dd3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f5554dd3 Branch: refs/heads/oya Commit: f5554dd315c730286edbb333b4008e48620e2acc Parents: ffd9951 Author: Robert Kanter <[email protected]> Authored: Mon May 8 15:44:53 2017 -0700 Committer: Robert Kanter <[email protected]> Committed: Mon May 8 15:44:53 2017 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/client/OozieClient.java | 4 +- .../apache/oozie/client/rest/RestConstants.java | 2 +- .../main/java/org/apache/oozie/BaseEngine.java | 18 +- .../org/apache/oozie/BaseLocalOozieClient.java | 601 +++++++++++++++++++ .../java/org/apache/oozie/LocalOozieClient.java | 245 +------- .../apache/oozie/LocalOozieClientBundle.java | 93 +++ .../org/apache/oozie/LocalOozieClientCoord.java | 328 ++-------- .../oozie/OozieClientOperationHandler.java | 173 ++++++ .../java/org/apache/oozie/OozieJsonFactory.java | 55 ++ .../java/org/apache/oozie/local/LocalOozie.java | 93 ++- .../org/apache/oozie/servlet/V0JobsServlet.java | 12 +- .../org/apache/oozie/servlet/V1JobsServlet.java | 184 ++---- .../apache/oozie/TestLocalOozieClientCoord.java | 72 ++- release-log.txt | 1 + 14 files changed, 1233 insertions(+), 648 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index 7370808..dbb7cbd 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -66,7 +66,7 @@ import org.codehaus.jackson.type.TypeReference; /** - * Client API to submit and manage Oozie workflow jobs against an Oozie intance. + * Client API to submit and manage Oozie workflow jobs against an Oozie instance. * <p> * This class is thread safe. * <p> @@ -1978,7 +1978,7 @@ public class OozieClient { * @param map the map * @return the string */ - private String mapToString(Map<String, String> map) { + protected String mapToString(Map<String, String> map) { StringBuilder sb = new StringBuilder(); Iterator<Entry<String, String>> it = map.entrySet().iterator(); while (it.hasNext()) { http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index f477531..74843dc 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -221,7 +221,7 @@ public interface RestConstants { String USER_PARAM = "user"; - public static final String COORD_ACTION_MISSING_DEPENDENCIES = "missing-dependencies"; + String COORD_ACTION_MISSING_DEPENDENCIES = "missing-dependencies"; String ADMIN_PURGE = "purge"; String PURGE_WF_AGE = "wf"; http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/BaseEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java index 2780ec2..6a8ebff 100644 --- a/core/src/main/java/org/apache/oozie/BaseEngine.java +++ b/core/src/main/java/org/apache/oozie/BaseEngine.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -320,7 +321,7 @@ public abstract class BaseEngine { /** * Stream job log. * - * @param xLogStreamer the log streamer + * @param logStreamer the log streamer * @param jobId the job id * @param writer the writer * @throws IOException Signals that an I/O exception has occurred. @@ -329,4 +330,19 @@ public abstract class BaseEngine { protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, BaseEngineException; + interface BaseEngineCallable<V> { + V callOrThrow() throws BaseEngineException; + } + + static <V> V callOrRethrow(final BaseEngineCallable<V> callable) throws OozieClientException { + try { + return callable.callOrThrow(); + } catch (final BaseEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + static <V> V throwNoOp() throws OozieClientException { + throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/BaseLocalOozieClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BaseLocalOozieClient.java b/core/src/main/java/org/apache/oozie/BaseLocalOozieClient.java new file mode 100644 index 0000000..38fb006 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/BaseLocalOozieClient.java @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import com.google.common.base.Preconditions; +import org.apache.oozie.client.BulkResponse; +import org.apache.oozie.client.BundleJob; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.JMSConnectionInfo; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.util.XConfiguration; +import org.json.simple.JSONObject; + +import java.io.IOException; +import java.io.PrintStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.oozie.BaseEngine.BaseEngineCallable; +import static org.apache.oozie.BaseEngine.callOrRethrow; +import static org.apache.oozie.BaseEngine.throwNoOp; +import static org.apache.oozie.OozieClientOperationHandler.OozieOperationJob; +import static org.apache.oozie.OozieClientOperationHandler.OozieJobOperationCaller; + +abstract class BaseLocalOozieClient extends OozieClient { + + private final BaseEngine engine; + + BaseLocalOozieClient(final BaseEngine engine) { + Preconditions.checkNotNull(engine); + this.engine = engine; + } + + @Override + public String getOozieUrl() { + return "localoozie"; + } + + @Override + public String getProtocolUrl() throws OozieClientException { + return "localoozie"; + } + + @Override + public synchronized void validateWSVersion() throws OozieClientException { + } + + @Override + public Properties createConfiguration() { + final Properties conf = new Properties(); + if (engine != null) { + conf.setProperty(USER_NAME, engine.getUser()); + } + conf.setProperty(GROUP_NAME, "users"); + return conf; + } + + // no-operation + @Override + public void setHeader(final String name, final String value) { + } + + // no-operation + @Override + public String getHeader(final String name) { + return ""; + } + + // no-operation + @Override + public void removeHeader(final String name) { + } + + // no-operation + @Override + public Iterator<String> getHeaderNames() { + return Collections.<String> emptySet().iterator(); + } + + @Override + public Map<String, String> getHeaders() { + return Collections.emptyMap(); + } + + @Override + public String submit(final Properties conf) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.submitJob(new XConfiguration(conf), false); + } + }); + } + + @Override + public void start(final String jobId) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.start(jobId); + return null; + } + }); + } + + @Override + public String run(final Properties conf) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.submitJob(new XConfiguration(conf), true); + } + }); + } + + @Override + public void reRun(final String jobId, final Properties conf) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.reRun(jobId, new XConfiguration(conf)); + return null; + } + }); + } + + @Override + public void suspend(final String jobId) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.suspend(jobId); + return null; + } + }); + } + + @Override + public void resume(final String jobId) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.resume(jobId); + return null; + } + }); + + } + + @Override + public void kill(final String jobId) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.kill(jobId); + return null; + } + }); + } + + @Override + public String dryrun(final Properties conf) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.dryRunSubmit(new XConfiguration(conf)); + } + }); + } + + @Override + public String getStatus(final String jobId) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.getJobStatus(jobId); + } + }); + } + + @Override + public String getJobDefinition(final String jobId) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.getDefinition(jobId); + } + }); + } + + @Override + public String getJobId(final String externalId) throws OozieClientException { + return callOrRethrow(new BaseEngineCallable<String>() { + @Override + public String callOrThrow() throws BaseEngineException { + return engine.getJobIdForExternalId(externalId); + } + }); + } + + @Override + public void slaEnableAlert(final String bundleId, final String actions, final String dates, final String coords) + throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.enableSLAAlert(bundleId, actions, dates, coords); + return null; + } + }); + } + + @Override + public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException { + slaEnableAlert(jobIds, actions, dates, null); + } + + @Override + public void slaDisableAlert(final String bundleId, final String actions, final String dates, final String coords) + throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.disableSLAAlert(bundleId, actions, dates, coords); + return null; + } + }); + } + + @Override + public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException { + slaDisableAlert(jobIds, actions, dates, null); + } + + @Override + public void slaChange(final String bundleId, final String actions, final String dates, final String coords, + final String newSlaParams) + throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.changeSLA(bundleId, actions, dates, coords, newSlaParams); + return null; + } + }); + } + + @Override + public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException { + slaChange(jobIds, actions, dates, null, newSlaParams); + } + + @Override + public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams) + throws OozieClientException { + slaChange(bundleId, actions, dates, coords, mapToString(newSlaParams)); + } + + @Override + public void change(final String jobId, final String changeValue) throws OozieClientException { + callOrRethrow(new BaseEngineCallable<Void>() { + @Override + public Void callOrThrow() throws BaseEngineException { + engine.change(jobId, changeValue); + return null; + } + }); + } + + @Override + public JSONObject bulkModifyJobs(final String actionType, final String filter, final String jobType, final int start, + final int len) + throws OozieClientException { + final JSONObject jsonObject; + switch (actionType) { + case RestConstants.JOB_ACTION_KILL: + jsonObject = killJobs(filter, jobType, start, len); + break; + case RestConstants.JOB_ACTION_SUSPEND: + jsonObject = suspendJobs(filter, jobType, start, len); + break; + case RestConstants.JOB_ACTION_RESUME: + jsonObject = resumeJobs(filter, jobType, start, len); + break; + default: + throw new IllegalArgumentException("Invalid actionType passed. actionType: " + actionType); + } + return jsonObject; + } + + @Override + public JSONObject killJobs(final String filter, final String jobType, final int start, final int len) + throws OozieClientException { + OozieClientOperationHandler handler = new OozieClientOperationHandler(engine); + final OozieOperationJob operation = handler.getOperationHandler(RestConstants.JOB_ACTION_KILL, filter, start, len); + + + return callOrRethrow(new BaseEngineCallable<JSONObject>() { + @Override + public JSONObject callOrThrow() throws BaseEngineException { + return new OozieJobOperationCaller().call(jobType, operation); + } + }); + } + + @Override + public JSONObject suspendJobs(final String filter, final String jobType, final int start, final int len) + throws OozieClientException { + OozieClientOperationHandler handler = new OozieClientOperationHandler(engine); + final OozieOperationJob operation = handler.getOperationHandler(RestConstants.JOB_ACTION_SUSPEND, filter, start, len); + + return callOrRethrow(new BaseEngineCallable<JSONObject>() { + @Override + public JSONObject callOrThrow() throws BaseEngineException { + return new OozieJobOperationCaller().call(jobType, operation); + } + }); + } + + @Override + public JSONObject resumeJobs(final String filter, final String jobType, final int start, final int len) + throws OozieClientException { + OozieClientOperationHandler handler = new OozieClientOperationHandler(engine); + final OozieOperationJob operation = handler.getOperationHandler(RestConstants.JOB_ACTION_RESUME, filter, start, len); + + return callOrRethrow(new BaseEngineCallable<JSONObject>() { + @Override + public JSONObject callOrThrow() throws BaseEngineException { + return new OozieJobOperationCaller().call(jobType, operation); + } + }); + } + + + @Override + public WorkflowJob getJobInfo(final String jobId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public WorkflowJob getJobInfo(final String jobId, final int start, final int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<WorkflowJob> getJobsInfo(final String filter, final int start, final int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<CoordinatorAction> kill(final String jobId, final String rangeType, final String scope) + throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<BulkResponse> getBulkInfo(final String filter, final int start, final int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff) throws OozieClientException { + return throwNoOp(); + } + + @Override + public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException { + return throwNoOp(); + } + + @Override + public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public CoordinatorJob getCoordJobInfo(String jobId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order) + throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) + throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<CoordinatorAction> reRunCoord(String jobId, String rerunType, String scope, boolean refresh, + boolean noCleanup, boolean failed, Properties props) throws OozieClientException { + return throwNoOp(); + } + + @Override + public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) + throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<CoordinatorJob> getCoordJobsInfo(String filter, int start, int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { + return throwNoOp(); + } + + @Override + protected HttpURLConnection createRetryableConnection(URL url, String method) throws IOException { + try { + return throwNoOp(); + } catch (OozieClientException e) { + throw new IOException(e); + } + } + + @Override + protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException { + return throwNoOp(); + } + + @Override + public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException { + return throwNoOp(); + } + + @Override + public JMSConnectionInfo getJMSConnectionInfo() throws OozieClientException { + return throwNoOp(); + } + + @Override + public String getJobLog(String jobId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public void getJobAuditLog(String jobId, PrintStream ps) throws OozieClientException { + throwNoOp(); + } + + @Override + public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) + throws OozieClientException { + throwNoOp(); + } + + @Override + public void getJobErrorLog(String jobId, PrintStream ps) throws OozieClientException { + throwNoOp(); + } + + @Override + public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) + throws OozieClientException { + throwNoOp(); + } + + @Override + public String getJMSTopicName(String jobId) throws OozieClientException { + return throwNoOp(); + } + + @Override + public void getSlaInfo(int start, int len, String filter) throws OozieClientException { + throwNoOp(); + } + + @Override + public void setSystemMode(SYSTEM_MODE status) throws OozieClientException { + throwNoOp(); + } + + @Override + public SYSTEM_MODE getSystemMode() throws OozieClientException { + return throwNoOp(); + } + + @Override + public String updateShareLib() throws OozieClientException { + return throwNoOp(); + } + + @Override + public String listShareLib(String sharelibKey) throws OozieClientException { + return throwNoOp(); + } + + @Override + public String getServerBuildVersion() throws OozieClientException { + return throwNoOp(); + } + + @Override + public String getClientBuildVersion() { + throw new UnsupportedOperationException("Operation not supported."); + } + + @Override + public String validateXML(String file) throws OozieClientException { + return throwNoOp(); + } + + @Override + public void pollJob(String id, int timeout, int interval, boolean verbose) throws OozieClientException { + throwNoOp(); + } + + @Override + public List<String> getQueueDump() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Map<String, String> getAvailableOozieServers() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Map<String, String> getServerConfiguration() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Map<String, String> getJavaSystemProperties() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Map<String, String> getOSEnv() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Metrics getMetrics() throws OozieClientException { + return throwNoOp(); + } + + @Override + public Instrumentation getInstrumentation() throws OozieClientException { + return throwNoOp(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/LocalOozieClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/LocalOozieClient.java b/core/src/main/java/org/apache/oozie/LocalOozieClient.java index f734f76..123725f 100644 --- a/core/src/main/java/org/apache/oozie/LocalOozieClient.java +++ b/core/src/main/java/org/apache/oozie/LocalOozieClient.java @@ -18,18 +18,14 @@ package org.apache.oozie; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - -import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.util.XConfiguration; + +import java.util.List; /** - * Client API to submit and manage Oozie workflow jobs against an Oozie intance. <p> This class is thread safe. <p> + * Client API to submit and manage Oozie workflow jobs against an Oozie instance. <p> This class is thread safe. <p> * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods: * <code>[NAME=VALUE][;NAME=VALUE]*</code>. <p> Valid filter names are: <p> <ul> <li>name: the workflow application * name from the workflow definition.</li> <li>user: the user that submitted the job.</li> <li>group: the group for the @@ -37,7 +33,7 @@ import org.apache.oozie.util.XConfiguration; * query will do an OR among all the filter values for the same name. Multiple values must be specified as different * name value pairs. */ -public class LocalOozieClient extends OozieClient { +public class LocalOozieClient extends BaseLocalOozieClient { private DagEngine dagEngine; @@ -47,217 +43,11 @@ public class LocalOozieClient extends OozieClient { * @param dagEngine the dag engine instance to use. */ public LocalOozieClient(DagEngine dagEngine) { + super(dagEngine); this.dagEngine = dagEngine; } /** - * Return the Oozie URL of the workflow client instance. <p> This URL is the base URL fo the Oozie system, with not - * protocol versioning. - * - * @return the Oozie URL of the workflow client instance. - */ - @Override - public String getOozieUrl() { - return "localoozie"; - } - - /** - * Return the Oozie URL used by the client and server for WS communications. <p> This URL is the original URL plus - * the versioning element path. - * - * @return the Oozie URL used by the client and server for communication. - * @throws org.apache.oozie.client.OozieClientException thrown in the client and the server are not protocol - * compatible. - */ - @Override - public String getProtocolUrl() throws OozieClientException { - return "localoozie"; - } - - /** - * Validate that the Oozie client and server instances are protocol compatible. - * - * @throws org.apache.oozie.client.OozieClientException thrown in the client and the server are not protocol - * compatible. - */ - @Override - public synchronized void validateWSVersion() throws OozieClientException { - } - - /** - * Create an empty configuration with just the {@link #USER_NAME} set to the JVM user name and the {@link - * #GROUP_NAME} set to 'other'. - * - * @return an empty configuration. - */ - @Override - public Properties createConfiguration() { - Properties conf = new Properties(); - if (dagEngine != null) { - conf.setProperty(USER_NAME, dagEngine.getUser()); - } - conf.setProperty(GROUP_NAME, "users"); - return conf; - } - - /** - * Set a HTTP header to be used in the WS requests by the workflow instance. - * - * @param name header name. - * @param value header value. - */ - @Override - public void setHeader(String name, String value) { - } - - /** - * Get the value of a set HTTP header from the workflow instance. - * - * @param name header name. - * @return header value, <code>null</code> if not set. - */ - @Override - public String getHeader(String name) { - return null; - } - - /** - * Remove a HTTP header from the workflow client instance. - * - * @param name header name. - */ - @Override - public void removeHeader(String name) { - } - - /** - * Return an iterator with all the header names set in the workflow instance. - * - * @return header names. - */ - @Override - @SuppressWarnings("unchecked") - public Iterator<String> getHeaderNames() { - return Collections.EMPTY_SET.iterator(); - } - - - /** - * Submit a workflow job. - * - * @param conf job configuration. - * @return the job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be submitted. - */ - @Override - public String submit(Properties conf) throws OozieClientException { - try { - return dagEngine.submitJob(new XConfiguration(conf), false); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Start a workflow job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be started. - */ - @Override - public void start(String jobId) throws OozieClientException { - try { - dagEngine.start(jobId); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Submit and start a workflow job. - * - * @param conf job configuration. - * @return the job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be submitted. - */ - @Override - public String run(Properties conf) throws OozieClientException { - try { - return dagEngine.submitJob(new XConfiguration(conf), true); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Rerun a workflow job. - * - * @param jobId job Id to rerun. - * @param conf configuration information for the rerun. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be started. - */ - @Override - public void reRun(String jobId, Properties conf) throws OozieClientException { - try { - dagEngine.reRun(jobId, new XConfiguration(conf)); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Suspend a workflow job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be suspended. - */ - @Override - public void suspend(String jobId) throws OozieClientException { - try { - dagEngine.suspend(jobId); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Resume a workflow job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be resume. - */ - @Override - public void resume(String jobId) throws OozieClientException { - try { - dagEngine.resume(jobId); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Kill a workflow job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job could not be killed. - */ - @Override - public void kill(String jobId) throws OozieClientException { - try { - dagEngine.kill(jobId); - } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** * Get the info of a workflow job. * * @param jobId job Id. @@ -306,20 +96,21 @@ public class LocalOozieClient extends OozieClient { return getJobsInfo(filter, 1, 100); } - /** - * Return the workflow job Id for an external Id. <p> The external Id must have provided at job creation time. - * - * @param externalId external Id given at job creation time. - * @return the workflow job Id for an external Id, <code>null</code> if none. - * @throws org.apache.oozie.client.OozieClientException thrown if the operation could not be done. - */ @Override - public String getJobId(String externalId) throws OozieClientException { + public WorkflowJob getJobInfo(String jobId, int start, int len) throws OozieClientException { try { - return dagEngine.getJobIdForExternalId(externalId); + return dagEngine.getJob(jobId, start, len); + } catch (DagEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); } - catch (DagEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + + @Override + public WorkflowAction getWorkflowActionInfo(String actionId) throws OozieClientException { + try { + return dagEngine.getWorkflowAction(actionId); + } catch (BaseEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/LocalOozieClientBundle.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/LocalOozieClientBundle.java b/core/src/main/java/org/apache/oozie/LocalOozieClientBundle.java new file mode 100644 index 0000000..3c229be --- /dev/null +++ b/core/src/main/java/org/apache/oozie/LocalOozieClientBundle.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import org.apache.oozie.client.BulkResponse; +import org.apache.oozie.client.BundleJob; +import org.apache.oozie.client.OozieClientException; + +import java.util.List; + +/** + * Client API to submit and manage Oozie Bundle jobs against an Oozie instance. + * <p> + * This class is thread safe. + * <p> + * Syntax for filter for the {@link #getJobsInfo(String)} {@link #getJobsInfo(String, int, int)} methods: + * <code>[NAME=VALUE][;NAME=VALUE]*</code>. + * <p> + * Valid filter names are: + * <p> + * <ul> + * <li>name: the bundle application name from the bundle definition.</li> + * <li>user: the user that submitted the job.</li> + * <li>group: the group for the job.</li> + * <li>status: the status of the job.</li> + * </ul> + * <p> + * The query will do an AND among all the filter names. The query will do an OR among all the filter values for the same + * name. Multiple values must be specified as different name value pairs. + */ +public class LocalOozieClientBundle extends BaseLocalOozieClient { + + private final BundleEngine bundleEngine; + + public LocalOozieClientBundle(BundleEngine bundleEngine) { + super(bundleEngine); + this.bundleEngine = bundleEngine; + } + + @Override + public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { + try { + return bundleEngine.getBundleJob(jobId); + } catch (BundleEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) + throws OozieClientException { + try { + bundleEngine.reRun(jobId, coordScope, dateScope, refresh, noCleanup); + } catch (BaseEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + return null; + } + + @Override + public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { + try { + return (List) bundleEngine.getBundleJobs(filter, start, len).getBundleJobs(); + } catch (BundleEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException { + try { + return (List) bundleEngine.getBulkJobs(filter, start, len).getResponses(); + } catch (BundleEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/LocalOozieClientCoord.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/LocalOozieClientCoord.java b/core/src/main/java/org/apache/oozie/LocalOozieClientCoord.java index 32b0cd0..d1e6972 100644 --- a/core/src/main/java/org/apache/oozie/LocalOozieClientCoord.java +++ b/core/src/main/java/org/apache/oozie/LocalOozieClientCoord.java @@ -19,8 +19,6 @@ package org.apache.oozie; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Properties; @@ -36,7 +34,7 @@ import org.apache.oozie.util.XConfiguration; /** * Client API to submit and manage Oozie coordinator jobs against an Oozie - * intance. + * instance. * <p> * This class is thread safe. * <p> @@ -57,7 +55,7 @@ import org.apache.oozie.util.XConfiguration; * among all the filter values for the same name. Multiple values must be * specified as different name value pairs. */ -public class LocalOozieClientCoord extends OozieClient { +public class LocalOozieClientCoord extends BaseLocalOozieClient { private final CoordinatorEngine coordEngine; @@ -68,178 +66,11 @@ public class LocalOozieClientCoord extends OozieClient { * @param coordEngine the engine instance to use. */ public LocalOozieClientCoord(CoordinatorEngine coordEngine) { + super(coordEngine); this.coordEngine = coordEngine; } /** - * Return the Oozie URL of the coordinator client instance. - * <p> - * This URL is the base URL fo the Oozie system, with not protocol - * versioning. - * - * @return the Oozie URL of the coordinator client instance. - */ - @Override - public String getOozieUrl() { - return "localoozie"; - } - - /** - * Return the Oozie URL used by the client and server for WS communications. - * <p> - * This URL is the original URL plus the versioning element path. - * - * @return the Oozie URL used by the client and server for communication. - * @throws org.apache.oozie.client.OozieClientException thrown in the client - * and the server are not protocol compatible. - */ - @Override - public String getProtocolUrl() throws OozieClientException { - return "localoozie"; - } - - /** - * Validate that the Oozie client and server instances are protocol - * compatible. - * - * @throws org.apache.oozie.client.OozieClientException thrown in the client - * and the server are not protocol compatible. - */ - @Override - public synchronized void validateWSVersion() throws OozieClientException { - } - - /** - * Create an empty configuration with just the {@link #USER_NAME} set to the - * JVM user name and the {@link #GROUP_NAME} set to 'other'. - * - * @return an empty configuration. - */ - @Override - public Properties createConfiguration() { - Properties conf = new Properties(); - if (coordEngine != null) { - conf.setProperty(USER_NAME, coordEngine.getUser()); - } - conf.setProperty(GROUP_NAME, "users"); - return conf; - } - - /** - * Set a HTTP header to be used in the WS requests by the coordinator - * instance. - * - * @param name header name. - * @param value header value. - */ - @Override - public void setHeader(String name, String value) { - } - - /** - * Get the value of a set HTTP header from the coordinator instance. - * - * @param name header name. - * @return header value, <code>null</code> if not set. - */ - @Override - public String getHeader(String name) { - return null; - } - - /** - * Remove a HTTP header from the coordinator client instance. - * - * @param name header name. - */ - @Override - public void removeHeader(String name) { - } - - /** - * Return an iterator with all the header names set in the coordinator - * instance. - * - * @return header names. - */ - @Override - @SuppressWarnings("unchecked") - public Iterator<String> getHeaderNames() { - return Collections.EMPTY_SET.iterator(); - } - - /** - * Submit a coordinator job. - * - * @param conf job configuration. - * @return the job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be submitted. - */ - @Override - public String submit(Properties conf) throws OozieClientException { - try { - return coordEngine.submitJob(new XConfiguration(conf), false); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Start a coordinator job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be started. - */ - @Override - @Deprecated - public void start(String jobId) throws OozieClientException { - try { - coordEngine.start(jobId); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - catch (BaseEngineException bex) { - throw new OozieClientException(bex.getErrorCode().toString(), bex); - } - } - - /** - * Submit and start a coordinator job. - * - * @param conf job configuration. - * @return the job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be submitted. - */ - @Override - public String run(Properties conf) throws OozieClientException { - try { - return coordEngine.submitJob(new XConfiguration(conf), true); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Rerun a workflow job. - * - * @param jobId job Id to rerun. - * @param conf configuration information for the rerun. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be started. - */ - @Override - @Deprecated - public void reRun(String jobId, Properties conf) throws OozieClientException { - throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); - } - - /** * Rerun coordinator actions. * * @param jobId coordinator jobId @@ -310,71 +141,6 @@ public class LocalOozieClientCoord extends OozieClient { } /** - * Suspend a coordinator job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be suspended. - */ - @Override - public void suspend(String jobId) throws OozieClientException { - try { - coordEngine.suspend(jobId); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Resume a coordinator job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be resume. - */ - @Override - public void resume(String jobId) throws OozieClientException { - try { - coordEngine.resume(jobId); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Kill a coordinator job. - * - * @param jobId job Id. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * could not be killed. - */ - @Override - public void kill(String jobId) throws OozieClientException { - try { - coordEngine.kill(jobId); - } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } - } - - /** - * Get the info of a workflow job. - * - * @param jobId job Id. - * @return the job info. - * @throws org.apache.oozie.client.OozieClientException thrown if the job - * info could not be retrieved. - */ - @Override - @Deprecated - public WorkflowJob getJobInfo(String jobId) throws OozieClientException { - throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); - } - - /** * Get the info of a coordinator job. * * @param jobId job Id. @@ -412,9 +178,6 @@ public class LocalOozieClientCoord extends OozieClient { try { return coordEngine.getCoordJob(jobId, filter, start, len, false); } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } catch (BaseEngineException bex) { throw new OozieClientException(bex.getErrorCode().toString(), bex); } @@ -433,32 +196,12 @@ public class LocalOozieClientCoord extends OozieClient { try { return coordEngine.getCoordAction(actionId); } - catch (CoordinatorEngineException ex) { - throw new OozieClientException(ex.getErrorCode().toString(), ex); - } catch (BaseEngineException bex) { throw new OozieClientException(bex.getErrorCode().toString(), bex); } } /** - * Return the info of the workflow jobs that match the filter. - * - * @param filter job filter. Refer to the {@link OozieClient} for the filter - * syntax. - * @param start jobs offset, base 1. - * @param len number of jobs to return. - * @return a list with the workflow jobs info, without node details. - * @throws OozieClientException thrown if the jobs info could not be - * retrieved. - */ - @Override - @Deprecated - public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException { - throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); - } - - /** * Return the info of the coordinator jobs that match the filter. * * @param filter job filter. Refer to the {@link OozieClient} for the filter @@ -486,21 +229,58 @@ public class LocalOozieClientCoord extends OozieClient { } } - /** - * Return the info of the workflow jobs that match the filter. - * <p> - * It returns the first 100 jobs that match the filter. - * - * @param filter job filter. Refer to the {@link LocalOozieClient} for the - * filter syntax. - * @return a list with the workflow jobs info, without node details. - * @throws org.apache.oozie.client.OozieClientException thrown if the jobs - * info could not be retrieved. - */ @Override - @Deprecated - public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException { - throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + public String updateCoord(String jobId, Properties conf, String dryrun, String showDiff) throws OozieClientException { + try { + return coordEngine.updateJob(new XConfiguration(conf), jobId, Boolean.valueOf(dryrun), Boolean.valueOf(showDiff)); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } } + @Override + public String updateCoord(String jobId, String dryrun, String showDiff) throws OozieClientException { + try { + return coordEngine.updateJob(new XConfiguration(), jobId, Boolean.valueOf(dryrun), Boolean.valueOf(showDiff)); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public CoordinatorJob getCoordJobInfo(String jobId, String filter, int start, int len, String order) + throws OozieClientException { + try { + return coordEngine.getCoordJob(jobId, filter, start, len, order.equalsIgnoreCase("DESC")); + } catch (BaseEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException { + try { + return (List) coordEngine.killActions(jobId, rangeType, scope).getCoordActions(); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public List<CoordinatorAction> ignore(String jobId, String scope) throws OozieClientException { + try { + return (List) coordEngine.ignore(jobId, RestConstants.JOB_COORD_SCOPE_ACTION, scope).getCoordActions(); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } + + @Override + public List<WorkflowJob> getWfsForCoordAction(String coordActionId) throws OozieClientException { + try { + return (List) coordEngine.getReruns(coordActionId); + } catch (CoordinatorEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/OozieClientOperationHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/OozieClientOperationHandler.java b/core/src/main/java/org/apache/oozie/OozieClientOperationHandler.java new file mode 100644 index 0000000..8338f4e --- /dev/null +++ b/core/src/main/java/org/apache/oozie/OozieClientOperationHandler.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import org.apache.oozie.client.rest.RestConstants; +import org.json.simple.JSONObject; + +class OozieClientOperationHandler { + + private static final String WF = "wf"; + private static final String BUNDLE = "bundle"; + private static final String COORD = "coord"; + private static final String GMT = "GMT"; + + private BaseEngine engine; + + OozieClientOperationHandler(BaseEngine engine) { + this.engine = engine; + } + + interface OozieOperationJob { + JSONObject BundleJob() throws BundleEngineException; + + JSONObject CoordinatorJob() throws CoordinatorEngineException; + + JSONObject WorkflowsJob() throws DagEngineException; + } + + abstract class AbstractOozieOperationJob implements OozieOperationJob { + protected final String filter; + protected final int start; + protected final int len; + + AbstractOozieOperationJob(final String filter, final int start, final int len) { + this.filter = filter; + this.start = start; + this.len = len; + } + + @Override + public JSONObject BundleJob() throws BundleEngineException { + return OozieJsonFactory.getBundleJSONObject(createBundleJobInfo(), GMT); + } + + abstract BundleJobInfo createBundleJobInfo() throws BundleEngineException; + + @Override + public JSONObject CoordinatorJob() throws CoordinatorEngineException { + return OozieJsonFactory.getCoordJSONObject(createCoordinatorJobInfo(), GMT); + } + + abstract CoordinatorJobInfo createCoordinatorJobInfo() throws CoordinatorEngineException; + + @Override + public JSONObject WorkflowsJob() throws DagEngineException { + return OozieJsonFactory.getWFJSONObject(createWorkflowsInfo(), GMT); + } + + abstract WorkflowsInfo createWorkflowsInfo() throws DagEngineException; + } + + + static class OozieJobOperationCaller { + JSONObject call(final String jobType, final OozieOperationJob job) + throws DagEngineException, BundleEngineException, CoordinatorEngineException { + switch (jobType) { + case WF: + return job.WorkflowsJob(); + case COORD: + return job.CoordinatorJob(); + case BUNDLE: + return job.BundleJob(); + default: + throw new IllegalArgumentException("Invalid jobType passed. jobType: " + jobType); + } + } + } + + class KillOperation extends AbstractOozieOperationJob { + + KillOperation(final String filter, final int start, final int len) { + super(filter, start, len); + } + + @Override + BundleJobInfo createBundleJobInfo() throws BundleEngineException { + return ((BundleEngine) engine).killJobs(filter, start, len); + } + + @Override + CoordinatorJobInfo createCoordinatorJobInfo() throws CoordinatorEngineException { + return ((CoordinatorEngine) engine).killJobs(filter, start, len); + } + + @Override + WorkflowsInfo createWorkflowsInfo() throws DagEngineException { + return ((DagEngine) engine).killJobs(filter, start, len); + } + } + + class SuspendingOperation extends AbstractOozieOperationJob { + + SuspendingOperation(final String filter, final int start, final int len) { + super(filter, start, len); + } + + @Override + BundleJobInfo createBundleJobInfo() throws BundleEngineException { + return ((BundleEngine) engine).suspendJobs(filter, start, len); + } + + @Override + CoordinatorJobInfo createCoordinatorJobInfo() throws CoordinatorEngineException { + return ((CoordinatorEngine) engine).suspendJobs(filter, start, len); + } + + @Override + WorkflowsInfo createWorkflowsInfo() throws DagEngineException { + return ((DagEngine) engine).suspendJobs(filter, start, len); + } + } + + class ResumingOperation extends AbstractOozieOperationJob { + + ResumingOperation(final String filter, final int start, final int len) { + super(filter, start, len); + } + + @Override + BundleJobInfo createBundleJobInfo() throws BundleEngineException { + return ((BundleEngine) engine).resumeJobs(filter, start, len); + } + + @Override + CoordinatorJobInfo createCoordinatorJobInfo() throws CoordinatorEngineException { + return ((CoordinatorEngine) engine).resumeJobs(filter, start, len); + } + + @Override + WorkflowsInfo createWorkflowsInfo() throws DagEngineException { + return ((DagEngine) engine).resumeJobs(filter, start, len); + } + } + + OozieOperationJob getOperationHandler(String operation, final String filter, final int start, final int len) { + switch (operation) { + case RestConstants.JOB_ACTION_KILL: + return new KillOperation(filter, start, len); + case RestConstants.JOB_ACTION_SUSPEND: + return new SuspendingOperation(filter, start, len); + case RestConstants.JOB_ACTION_RESUME: + return new ResumingOperation(filter, start, len); + default: + throw new IllegalArgumentException(operation); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/OozieJsonFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/OozieJsonFactory.java b/core/src/main/java/org/apache/oozie/OozieJsonFactory.java new file mode 100644 index 0000000..b988ca0 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/OozieJsonFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import org.apache.oozie.client.rest.JsonTags; +import org.json.simple.JSONObject; + +public final class OozieJsonFactory { + + private OozieJsonFactory() { + } + + public static JSONObject getWFJSONObject(WorkflowsInfo jobs, String timeZoneId) { + JSONObject json = new JSONObject(); + json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jobs.getWorkflows(), timeZoneId)); + json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); + json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); + json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); + return json; + } + + public static JSONObject getCoordJSONObject(CoordinatorJobInfo jobs, String timeZoneId) { + JSONObject json = new JSONObject(); + json.put(JsonTags.COORDINATOR_JOBS, CoordinatorJobBean.toJSONArray(jobs.getCoordJobs(), timeZoneId)); + json.put(JsonTags.COORD_JOB_TOTAL, jobs.getTotal()); + json.put(JsonTags.COORD_JOB_OFFSET, jobs.getStart()); + json.put(JsonTags.COORD_JOB_LEN, jobs.getLen()); + return json; + } + + public static JSONObject getBundleJSONObject(BundleJobInfo jobs, String timeZoneId) { + JSONObject json = new JSONObject(); + json.put(JsonTags.BUNDLE_JOBS, BundleJobBean.toJSONArray(jobs.getBundleJobs(), timeZoneId)); + json.put(JsonTags.BUNDLE_JOB_TOTAL, jobs.getTotal()); + json.put(JsonTags.BUNDLE_JOB_OFFSET, jobs.getStart()); + json.put(JsonTags.BUNDLE_JOB_LEN, jobs.getLen()); + return json; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/local/LocalOozie.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/local/LocalOozie.java b/core/src/main/java/org/apache/oozie/local/LocalOozie.java index bf1b0db..9ab646c 100644 --- a/core/src/main/java/org/apache/oozie/local/LocalOozie.java +++ b/core/src/main/java/org/apache/oozie/local/LocalOozie.java @@ -18,11 +18,14 @@ package org.apache.oozie.local; +import org.apache.oozie.BundleEngine; import org.apache.oozie.CoordinatorEngine; import org.apache.oozie.DagEngine; import org.apache.oozie.LocalOozieClient; +import org.apache.oozie.LocalOozieClientBundle; import org.apache.oozie.LocalOozieClientCoord; import org.apache.oozie.client.OozieClient; +import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.CallbackService; import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.DagEngineService; @@ -114,12 +117,20 @@ public class LocalOozie { } /** - * Return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. <p> The returned instance is configured - * with the user name of the JVM (the value of the system property 'user.name'). <p> The following methods of the - * client are NOP in the returned instance: {@link org.apache.oozie.client.OozieClient#validateWSVersion}, {@link - * org.apache.oozie.client.OozieClient#setHeader}, {@link org.apache.oozie.client.OozieClient#getHeader}, {@link - * org.apache.oozie.client.OozieClient#removeHeader}, {@link org.apache.oozie.client.OozieClient#getHeaderNames} and - * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}. + * Return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. + * <p> + * The returned instance is configured + * with the user name of the JVM (the value of the system property 'user.name'). + * <p> + * The following methods of the client are NOP in the returned instance: + * {@link org.apache.oozie.client.OozieClient#validateWSVersion}, + * {@link org.apache.oozie.client.OozieClient#setHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeader}, + * {@link org.apache.oozie.client.OozieClient#removeHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. * * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. */ @@ -138,8 +149,10 @@ public class LocalOozie { * {@link org.apache.oozie.client.OozieClient#setHeader}, * {@link org.apache.oozie.client.OozieClient#getHeader}, * {@link org.apache.oozie.client.OozieClient#removeHeader}, - * {@link org.apache.oozie.client.OozieClient#getHeaderNames} and - * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}. + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. * * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. */ @@ -151,10 +164,14 @@ public class LocalOozie { * Return a {@link org.apache.oozie.client.OozieClient} for LocalOozie configured for a given user. * <p> * The following methods of the client are NOP in the returned instance: - * {@link org.apache.oozie.client.OozieClient#validateWSVersion}, {@link org.apache.oozie.client.OozieClient#setHeader}, - * {@link org.apache.oozie.client.OozieClient#getHeader}, {@link org.apache.oozie.client.OozieClient#removeHeader}, - * {@link org.apache.oozie.client.OozieClient#getHeaderNames} - * and {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}. + * {@link org.apache.oozie.client.OozieClient#validateWSVersion}, + * {@link org.apache.oozie.client.OozieClient#setHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeader}, + * {@link org.apache.oozie.client.OozieClient#removeHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. * * @param user user name to use in LocalOozie for running workflows. * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie configured for the given user. @@ -177,8 +194,10 @@ public class LocalOozie { * {@link org.apache.oozie.client.OozieClient#setHeader}, * {@link org.apache.oozie.client.OozieClient#getHeader}, * {@link org.apache.oozie.client.OozieClient#removeHeader}, - * {@link org.apache.oozie.client.OozieClient#getHeaderNames} and - * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}. + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. * * @param user user name to use in LocalOozie for running coordinator. * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie @@ -193,4 +212,50 @@ public class LocalOozie { return new LocalOozieClientCoord(coordEngine); } + /** + * <p> + * The returned instance is configured with the user name of the JVM (the + * value of the system property 'user.name'). + * <p> + * The following methods of the client are NOP in the returned instance: + * {@link org.apache.oozie.client.OozieClient#validateWSVersion}, + * {@link org.apache.oozie.client.OozieClient#setHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeader}, + * {@link org.apache.oozie.client.OozieClient#removeHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. + * + * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. + */ + public static OozieClient getClientBundle() { + return getClientBundle(System.getProperty("user.name")); + } + + /** + * <p> + * The returned instance is configured with the user name of the JVM (the + * value of the system property 'user.name'). + * <p> + * The following methods of the client are NOP in the returned instance: + * {@link org.apache.oozie.client.OozieClient#validateWSVersion}, + * {@link org.apache.oozie.client.OozieClient#setHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeader}, + * {@link org.apache.oozie.client.OozieClient#removeHeader}, + * {@link org.apache.oozie.client.OozieClient#getHeaderNames}, + * {@link org.apache.oozie.client.OozieClient#setSystemMode(OozieClient.SYSTEM_MODE)}, + * {@link org.apache.oozie.client.OozieClient#getHeaders}, + * {@link org.apache.oozie.client.OozieClient#getClientBuildVersion}. + * + * @return a {@link org.apache.oozie.client.OozieClient} for LocalOozie. + */ + public static OozieClient getClientBundle(String user) { + if (!localOozieActive) { + throw new IllegalStateException("LocalOozie is not initialized"); + } + ParamChecker.notEmpty(user, "user"); + BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(user); + return new LocalOozieClientBundle(bundleEngine); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/f5554dd3/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java index 2c79ef0..8c978fb 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobsServlet.java @@ -19,7 +19,6 @@ package org.apache.oozie.servlet; import java.io.IOException; -import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -28,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.oozie.DagEngine; import org.apache.oozie.DagEngineException; import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.OozieJsonFactory; import org.apache.oozie.WorkflowsInfo; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.JsonTags; @@ -96,7 +95,7 @@ public class V0JobsServlet extends BaseJobsServlet { */ @Override protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException { - JSONObject json = new JSONObject(); + JSONObject json; try { String filter = request.getParameter(RestConstants.JOBS_FILTER_PARAM); String startStr = request.getParameter(RestConstants.OFFSET_PARAM); @@ -108,12 +107,7 @@ public class V0JobsServlet extends BaseJobsServlet { DagEngine dagEngine = Services.get().get(DagEngineService.class) .getDagEngine(getUser(request)); WorkflowsInfo jobs = dagEngine.getJobs(filter, start, len); - List<WorkflowJobBean> jsonWorkflows = jobs.getWorkflows(); - json.put(JsonTags.WORKFLOWS_JOBS, WorkflowJobBean.toJSONArray(jsonWorkflows, "GMT")); - json.put(JsonTags.WORKFLOWS_TOTAL, jobs.getTotal()); - json.put(JsonTags.WORKFLOWS_OFFSET, jobs.getStart()); - json.put(JsonTags.WORKFLOWS_LEN, jobs.getLen()); - + json = OozieJsonFactory.getWFJSONObject(jobs, "GMT"); } catch (DagEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
