Repository: oozie Updated Branches: refs/heads/master 12ef61470 -> 313652559
OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/31365255 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/31365255 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/31365255 Branch: refs/heads/master Commit: 313652559bc43f3c89c1f76c4d00c0529314213c Parents: 12ef614 Author: Shwetha GS <[email protected]> Authored: Thu Jul 24 12:04:01 2014 +0530 Committer: Shwetha GS <[email protected]> Committed: Thu Jul 24 12:06:58 2014 +0530 ---------------------------------------------------------------------- .../java/org/apache/oozie/command/Command.java | 594 ------------------- .../apache/oozie/command/CommandException.java | 2 +- .../coord/CoordActionMaterializeCommand.java | 375 ------------ .../CoordMaterializeTransitionXCommand.java | 12 +- .../command/coord/CoordSubmitXCommand.java | 18 +- .../oozie/command/coord/CoordinatorCommand.java | 47 -- .../service/CoordMaterializeTriggerService.java | 2 +- .../org/apache/oozie/command/TestCommand.java | 214 ------- .../TestCoordActionMaterializeCommand.java | 323 ---------- .../command/coord/TestCoordELExtensions.java | 4 +- .../TestCoordMaterializeTransitionXCommand.java | 16 +- pom.xml | 2 +- release-log.txt | 1 + 13 files changed, 42 insertions(+), 1568 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/Command.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/Command.java b/core/src/main/java/org/apache/oozie/command/Command.java deleted file mode 100644 index 0e51b8e..0000000 --- a/core/src/main/java/org/apache/oozie/command/Command.java +++ /dev/null @@ -1,594 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.FaultInjection; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.XException; -import org.apache.oozie.service.CallableQueueService; -import org.apache.oozie.service.DagXLogInfoService; -import org.apache.oozie.service.InstrumentationService; -import org.apache.oozie.service.MemoryLocksService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.StoreService; -import org.apache.oozie.service.XLogService; -import org.apache.oozie.store.Store; -import org.apache.oozie.store.StoreException; -import org.apache.oozie.store.WorkflowStore; -import org.apache.oozie.util.InstrumentUtils; -import org.apache.oozie.util.Instrumentation; -import org.apache.oozie.util.ParamChecker; -import org.apache.oozie.util.XCallable; -import org.apache.oozie.util.XLog; -import org.apache.oozie.lock.LockToken; - -/** - * Base class for all synchronous and asynchronous DagEngine commands. - */ -public abstract class Command<T, S extends Store> implements XCallable<T> { - /** - * The instrumentation group used for Commands. - */ - private static final String INSTRUMENTATION_GROUP = "commands"; - - private final long createdTime; - - /** - * The instrumentation group used for Jobs. - */ - private static final String INSTRUMENTATION_JOB_GROUP = "jobs"; - - private static final long LOCK_TIMEOUT = 1000; - protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000; - - protected Instrumentation instrumentation; - private List<XCallable<Void>> callables; - private List<XCallable<Void>> delayedCallables; - private long delay = 0; - private List<XCallable<Void>> exceptionCallables; - private String name; - private String type; - private String key; - private int priority; - private int logMask; - private boolean withStore; - protected boolean dryrun = false; - private ArrayList<LockToken> locks = null; - - /** - * This variable is package private for testing purposes only. - */ - XLog.Info logInfo; - - /** - * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are - * captured for execution. - * - * @param name command name. - * @param type command type. - * @param priority priority of the command, used when queuing for asynchronous execution. - * @param logMask log mask for the command logging calls. - */ - public Command(String name, String type, int priority, int logMask) { - this(name, type, priority, logMask, true); - } - - /** - * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. - * - * @param name command name. - * @param type command type. - * @param priority priority of the command, used when queuing for asynchronous execution. - * @param logMask log mask for the command logging calls. - * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. - */ - public Command(String name, String type, int priority, int logMask, boolean withStore) { - this.name = ParamChecker.notEmpty(name, "name"); - this.type = ParamChecker.notEmpty(type, "type"); - this.key = name + "_" + UUID.randomUUID(); - this.priority = priority; - this.withStore = withStore; - this.logMask = logMask; - instrumentation = Services.get().get(InstrumentationService.class).get(); - logInfo = new XLog.Info(XLog.Info.get()); - createdTime = System.currentTimeMillis(); - locks = new ArrayList<LockToken>(); - } - - /** - * Create a command. <p/> The current {@link XLog.Info} values are captured for execution. - * - * @param name command name. - * @param type command type. - * @param priority priority of the command, used when queuing for asynchronous execution. - * @param logMask log mask for the command logging calls. - * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not. - * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without - * really submitting the job - */ - public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) { - this(name, type, priority, logMask, withStore); - this.dryrun = dryrun; - } - - /** - * Return the name of the command. - * - * @return the name of the command. - */ - @Override - public String getName() { - return name; - } - - /** - * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link - * org.apache.oozie.service.CallableQueueService}. - * - * @return the callable type. - */ - @Override - public String getType() { - return type; - } - - /** - * Return the priority of the command. - * - * @return the priority of the command. - */ - @Override - public int getPriority() { - return priority; - } - - /** - * Returns the createdTime of the callable in milliseconds - * - * @return the callable createdTime - */ - @Override - public long getCreatedTime() { - return createdTime; - } - - /** - * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is - * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a - * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link - * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands - * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed. - * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the - * commands queued for exception will be effectively queued fro execution.. - * - * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed - * without committing, thus doing a rollback. - */ - @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"}) - public final T call() throws CommandException { - XLog.Info.get().setParameters(logInfo); - XLog log = XLog.getLog(getClass()); - log.trace(logMask, "Start"); - Instrumentation.Cron cron = new Instrumentation.Cron(); - cron.start(); - callables = new ArrayList<XCallable<Void>>(); - delayedCallables = new ArrayList<XCallable<Void>>(); - exceptionCallables = new ArrayList<XCallable<Void>>(); - delay = 0; - S store = null; - boolean exception = false; - - try { - if (withStore) { - store = (S) Services.get().get(StoreService.class).getStore(getStoreClass()); - store.beginTrx(); - } - T result = execute(store); - /* - * - * if (store != null && log != null) { log.info(XLog.STD, - * "connection log from store Flush Mode {0} ", - * store.getFlushMode()); } - */ - if (withStore) { - if (store == null) { - throw new IllegalStateException("WorkflowStore should not be null"); - } - if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) { - throw new RuntimeException("Skipping Commit for Failover Testing"); - } - store.commitTrx(); - } - - // TODO figure out the reject due to concurrency problems and remove - // the delayed queuing for callables. - boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10); - if (ret == false) { - logQueueCallableFalse(callables); - } - - ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay); - if (ret == false) { - logQueueCallableFalse(delayedCallables); - } - - return result; - } - catch (XException ex) { - log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex); - if (store != null) { - log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store - .isClosed()); - } - exception = true; - if (store != null && store.isActive()) { - try { - store.rollbackTrx(); - } - catch (RuntimeException rex) { - log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); - } - } - - // TODO figure out the reject due to concurrency problems and remove - // the delayed queuing for callables. - boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10); - if (ret == false) { - logQueueCallableFalse(exceptionCallables); - } - if (ex instanceof CommandException) { - throw (CommandException) ex; - } - else { - throw new CommandException(ex); - } - } - catch (Exception ex) { - log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex); - exception = true; - if (store != null && store.isActive()) { - try { - store.rollbackTrx(); - } - catch (RuntimeException rex) { - log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); - } - } - throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex); - } - catch (Error er) { - log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er); - exception = true; - if (store != null && store.isActive()) { - try { - store.rollbackTrx(); - } - catch (RuntimeException rex) { - log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); - } - } - throw er; - } - finally { - FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection"); - cron.stop(); - instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron); - InstrumentUtils.incrCommandCounter(name, 1, instrumentation); - log.trace(logMask, "End"); - if (locks != null) { - for (LockToken lock : locks) { - lock.release(); - } - locks.clear(); - } - if (store != null) { - if (!store.isActive()) { - try { - store.closeTrx(); - } - catch (RuntimeException rex) { - if (exception) { - log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex); - } - else { - throw rex; - } - } - } - else { - log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager."); - } - } - } - } - - /** - * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore} - * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a - * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they - * are not queued for execution. - * - * @param callable callable to queue for execution. - */ - protected void queueCallable(XCallable<Void> callable) { - callables.add(callable); - } - - /** - * Queue a list of callables for execution after the current callable call invocation completes and the {@link - * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are - * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are - * discarded, they are not queued for execution. - * - * @param callables list of callables to queue for execution. - */ - protected void queueCallable(List<? extends XCallable<Void>> callables) { - this.callables.addAll(callables); - } - - /** - * Queue a callable for delayed execution after the current callable call invocation completes and the {@link - * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue - * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables. - * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for - * execution. - * - * @param callable callable to queue for delayed execution. - * @param delay the queue delay in milliseconds - */ - protected void queueCallable(XCallable<Void> callable, long delay) { - this.delayedCallables.add(callable); - this.delay = Math.max(this.delay, delay); - } - - /** - * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If - * an exception does not happen, all the callables queued by this method are discarded, they are not queued for - * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single - * serial execution. - * - * @param callable callable to queue for execution in the case of an exception. - */ - protected void queueCallableForException(XCallable<Void> callable) { - exceptionCallables.add(callable); - } - - /** - * Logging the info if failed to queue the callables. - * - * @param callables - */ - protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) { - StringBuilder sb = new StringBuilder( - "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:["); - int size = callables.size(); - for (int i = 0; i < size; i++) { - XCallable<Void> callable = callables.get(i); - sb.append(callable.getName()); - if (i < size - 1) { - sb.append(", "); - } - else { - sb.append("]"); - } - } - XLog.getLog(getClass()).warn(sb.toString()); - } - - /** - * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in - * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction - * is rolledback. - * - * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a - * store. - * @return the return value of the callable. - * @throws StoreException thrown if the workflow store could not perform an operation. - * @throws CommandException thrown if the command could not perform its operation. - */ - protected abstract T call(S store) throws StoreException, CommandException; - - // to do - // need to implement on all sub commands and break down the transactions - - // protected abstract T execute(String id) throws CommandException; - - /** - * Command subclasses must implement this method correct Store can be passed to call(store); - * - * @return the Store class for use by Callable - * @throws CommandException thrown if the command could not perform its operation. - */ - protected abstract Class<? extends Store> getStoreClass(); - - /** - * Set the log info with the context of the given coordinator bean. - * - * @param cBean coordinator bean. - */ - protected void setLogInfo(CoordinatorJobBean cBean) { - if (logInfo.getParameter(XLogService.GROUP) == null) { - logInfo.setParameter(XLogService.GROUP, cBean.getGroup()); - } - if (logInfo.getParameter(XLogService.USER) == null) { - logInfo.setParameter(XLogService.USER, cBean.getUser()); - } - logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId()); - logInfo.setParameter(DagXLogInfoService.TOKEN, ""); - logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName()); - XLog.Info.get().setParameters(logInfo); - } - - /** - * Set the log info with the context of the given coordinator action bean. - * - * @param action action bean. - */ - protected void setLogInfo(CoordinatorActionBean action) { - logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); - // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); - logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); - XLog.Info.get().setParameters(logInfo); - } - - /** - * Set the log info with the context of the given workflow bean. - * - * @param workflow workflow bean. - */ - protected void setLogInfo(WorkflowJobBean workflow) { - if (logInfo.getParameter(XLogService.GROUP) == null) { - logInfo.setParameter(XLogService.GROUP, workflow.getGroup()); - } - if (logInfo.getParameter(XLogService.USER) == null) { - logInfo.setParameter(XLogService.USER, workflow.getUser()); - } - logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId()); - logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken()); - logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName()); - XLog.Info.get().setParameters(logInfo); - } - - /** - * Set the log info with the context of the given action bean. - * - * @param action action bean. - */ - protected void setLogInfo(WorkflowActionBean action) { - logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId()); - logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken()); - logInfo.setParameter(DagXLogInfoService.ACTION, action.getId()); - XLog.Info.get().setParameters(logInfo); - } - - /** - * Reset the action bean information from the log info. - */ - // TODO check if they are used, else delete - protected void resetLogInfoAction() { - logInfo.clearParameter(DagXLogInfoService.ACTION); - XLog.Info.get().clearParameter(DagXLogInfoService.ACTION); - } - - /** - * Reset the workflow bean information from the log info. - */ - // TODO check if they are used, else delete - protected void resetLogInfoWorkflow() { - logInfo.clearParameter(DagXLogInfoService.JOB); - logInfo.clearParameter(DagXLogInfoService.APP); - logInfo.clearParameter(DagXLogInfoService.TOKEN); - XLog.Info.get().clearParameter(DagXLogInfoService.JOB); - XLog.Info.get().clearParameter(DagXLogInfoService.APP); - XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN); - } - - /** - * Return the {@link Instrumentation} instance in use. - * - * @return the {@link Instrumentation} instance in use. - */ - protected Instrumentation getInstrumentation() { - return instrumentation; - } - - /** - * Return the identity. - * - * @return the identity. - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(getType()); - sb.append(",").append(getPriority()); - return sb.toString(); - } - - protected boolean lock(String id) throws InterruptedException { - if (id == null || id.length() == 0) { - XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":"); - return false; - } - LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT); - if (token != null) { - locks.add(token); - return true; - } - else { - return false; - } - } - - /* - * TODO - remove store coupling to EM. Store will only contain queries - * protected EntityManager getEntityManager() { return - * store.getEntityManager(); } - */ - protected T execute(S store) throws CommandException, StoreException { - T result = call(store); - return result; - } - - /** - * Get command key - * - * @return command key - */ - @Override - public String getKey(){ - return this.key; - } - - /** - * Get command lock key returning the key as an entity key, [not used] Just - * to be able to implement XCallable [to be deprecated] - * - * @return key - */ - @Override - public String getEntityKey() { - return this.key; - } - - /** - * set the mode of execution for the callable. True if in interrupt, false - * if not [to be deprecated] - */ - public void setInterruptMode(boolean mode) { - } - - /** - * [to be deprecated] - * - * @return the mode of execution. true if it is executed as an Interrupt, - * false otherwise - */ - public boolean inInterruptMode() { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/CommandException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/CommandException.java b/core/src/main/java/org/apache/oozie/command/CommandException.java index ce5ef54..3740bc8 100644 --- a/core/src/main/java/org/apache/oozie/command/CommandException.java +++ b/core/src/main/java/org/apache/oozie/command/CommandException.java @@ -21,7 +21,7 @@ import org.apache.oozie.XException; import org.apache.oozie.ErrorCode; /** - * Exception thrown by {@link Command}s. + * Exception thrown by {@link XCommand}s. */ public class CommandException extends XException { http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java deleted file mode 100644 index 14dee97..0000000 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command.coord; - -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.TimeZone; - -import org.apache.hadoop.conf.Configuration; -import org.apache.oozie.AppType; -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.SLAEventBean; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.SLAEvent.SlaAppType; -import org.apache.oozie.client.rest.JsonBean; -import org.apache.oozie.command.CommandException; -import org.apache.oozie.coord.TimeUnit; -import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; -import org.apache.oozie.executor.jpa.BatchQueryExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; -import org.apache.oozie.executor.jpa.JPAExecutorException; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.Service; -import org.apache.oozie.service.Services; -import org.apache.oozie.store.CoordinatorStore; -import org.apache.oozie.store.StoreException; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.Instrumentation; -import org.apache.oozie.sla.SLAOperations; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.db.SLADbOperations; -import org.jdom.Element; - -@SuppressWarnings("deprecation") -public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> { - private String jobId; - private Date startTime; - private Date endTime; - private int lastActionNumber = 1; // over-ride by DB value - private final XLog log = XLog.getLog(getClass()); - private String user; - private String group; - private List<JsonBean> insertList = new ArrayList<JsonBean>(); - private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); - - /** - * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout - */ - public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout"; - - public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) { - super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false); - this.jobId = jobId; - this.startTime = startTime; - this.endTime = endTime; - } - - @Override - protected Void call(CoordinatorStore store) throws CommandException { - CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId); - CoordinatorJobBean job; - try { - job = Services.get().get(JPAService.class).execute(getCoordJob); - } - catch (JPAExecutorException jex) { - throw new CommandException(jex); - } - setLogInfo(job); - if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) { - log.info("ENDED Coordinator materialization for jobId = " + jobId - + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr()); - return null; - } - - if (endTime.after(job.getEndTime())) { - log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime - + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr()); - return null; - } - - if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) { - log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime - + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr()); - // pausetime blocks real materialization - we change job's status back to RUNNING; - if (job.getStatus() == CoordinatorJob.Status.PREMATER) { - job.setStatus(CoordinatorJob.Status.RUNNING); - } - updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job)); - return null; - } - - this.user = job.getUser(); - this.group = job.getGroup(); - - if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) { - Configuration jobConf = null; - log.debug("start job :" + jobId + " Materialization "); - try { - jobConf = new XConfiguration(new StringReader(job.getConf())); - } - catch (IOException ioe) { - log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe); - throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); - } - - try { - materializeJobs(false, job, jobConf, store); - updateJobTable(job, store); - } - catch (CommandException ex) { - log.warn("Exception occurs:" + ex + " Making the job failed "); - job.setStatus(CoordinatorJobBean.Status.FAILED); - updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job)); - } - catch (Exception e) { - log.error("Excepion thrown :", e); - throw new CommandException(ErrorCode.E1001, e.getMessage(), e); - } - } - else { - log.info("WARN: action is not in PREMATER state! It's in state=" + job.getStatus()); - } - return null; - } - - /** - * Create action instances starting from "start-time" to end-time" and store them into Action table. - * - * @param dryrun - * @param jobBean - * @param conf - * @param store - * @throws Exception - */ - protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf, - CoordinatorStore store) throws Exception { - String jobXml = jobBean.getJobXml(); - Element eJob = XmlUtils.parseXml(jobXml); - // TODO: always UTC? - TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone()); - // TimeZone appTz = DateUtils.getTimeZone("UTC"); - int frequency = Integer.valueOf(jobBean.getFrequency()); - TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")); - TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); - Calendar start = Calendar.getInstance(appTz); - start.setTime(startTime); - DateUtils.moveToEnd(start, endOfFlag); - Calendar end = Calendar.getInstance(appTz); - end.setTime(endTime); - lastActionNumber = jobBean.getLastActionNumber(); - // DateUtils.moveToEnd(end, endOfFlag); - log.info(" *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() - + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency - + ":" + freqTU + " lastActionNumber " + lastActionNumber); - // Keep the actual start time - Calendar origStart = Calendar.getInstance(appTz); - origStart.setTime(jobBean.getStartTimestamp()); - // Move to the End of duration, if needed. - DateUtils.moveToEnd(origStart, endOfFlag); - // Cloning the start time to be used in loop iteration - Calendar effStart = (Calendar) origStart.clone(); - // Move the time when the previous action finished - effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); - - String action = null; - StringBuilder actionStrings = new StringBuilder(); - Date jobPauseTime = jobBean.getPauseTime(); - Calendar pause = null; - if (jobPauseTime != null) { - pause = Calendar.getInstance(appTz); - pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); - } - - while (effStart.compareTo(end) < 0) { - if (pause != null && effStart.compareTo(pause) >= 0) { - break; - } - CoordinatorActionBean actionBean = new CoordinatorActionBean(); - lastActionNumber++; - - int timeout = jobBean.getTimeout(); - log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime() - + ", lastactionnumber=" + lastActionNumber); - Date actualTime = new Date(); - action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), - effStart.getTime(), actualTime, lastActionNumber, conf, actionBean); - int catchUpTOMultiplier = 1; // This value might be could be changed in future - if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) { - // Catchup action - timeout = catchUpTOMultiplier * timeout; - // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP, - // -1)); - log.info("Catchup timeout is :" + actionBean.getTimeOut()); - } - actionBean.setTimeOut(timeout); - - if (!dryrun) { - storeToDB(actionBean, action, store, jobBean.getAppName()); // Storing to table - } - else { - actionStrings.append("action for new instance"); - actionStrings.append(action); - } - // Restore the original start time - effStart = (Calendar) origStart.clone(); - effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency); - } - - endTime = new Date(effStart.getTimeInMillis()); - if (!dryrun) { - return action; - } - else { - return actionStrings.toString(); - } - } - - /** - * Store an Action into database table. - * - * @param actionBean - * @param actionXml - * @param store - * @param appName - * @throws Exception - */ - private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store, String appName) - throws Exception { - log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length()); - actionBean.setActionXml(actionXml); - insertList.add(actionBean); - createActionRegistration(actionXml, actionBean, store, appName); - - // TODO: time 100s should be configurable - queueCallable(new CoordActionNotificationXCommand(actionBean), 100); - queueCallable(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100); - } - - /** - * @param actionXml - * @param actionBean - * @param store - * @param appName - * @throws Exception - */ - private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store, - String appName) throws Exception { - Element eAction = XmlUtils.parseXml(actionXml); - Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); - SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(), - SlaAppType.COORDINATOR_ACTION, user, group); - if(slaEvent != null) { - insertList.add(slaEvent); - } - // insert into new sla reg table too - SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), - AppType.COORDINATOR_ACTION, user, appName, log, false); - } - - /** - * @param job - * @param store - * @throws StoreException - */ - private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) { - // TODO: why do we need this? Isn't lastMatTime enough??? - job.setLastActionTime(endTime); - job.setLastActionNumber(lastActionNumber); - // if the job endtime == action endtime, then set status of job to - // succeeded - // we dont need to materialize this job anymore - Date jobEndTime = job.getEndTime(); - if (jobEndTime.compareTo(endTime) <= 0) { - job.setStatus(CoordinatorJob.Status.SUCCEEDED); - log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED"); - } - else { - job.setStatus(CoordinatorJob.Status.RUNNING); - log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING"); - } - job.setNextMaterializedTime(endTime); - updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job)); - } - - @Override - protected Void execute(CoordinatorStore store) throws StoreException, CommandException { - log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime=" - + endTime); - try { - if (lock(jobId)) { - call(store); - JPAService jpaService = Services.get().get(JPAService.class); - if (jpaService != null) { - try { - BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); - } - catch (JPAExecutorException je) { - throw new CommandException(je); - } - } - else { - throw new CommandException(ErrorCode.E0610); - } - } - else { - queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), - LOCK_FAILURE_REQUEUE_INTERVAL); - log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId - + ". Requeing the same."); - } - } - catch (InterruptedException e) { - queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL); - log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage() - + " for jobId=" + jobId + " Requeing the same."); - } - finally { - log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime - + ", endTime=" + endTime); - } - return null; - } - - - - /** - * For preliminery testing. Should be removed soon - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - new Services().init(); - try { - Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z"); - Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z"); - String jobId = "0000000-091207151850551-oozie-dani-C"; - CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime); - matCmd.call(); - } - finally { - try { - Thread.sleep(60000); - } - catch (Exception ex) { - } - new Services().destroy(); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index 23bafb8..b4b2fef 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -101,6 +101,16 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo this.materializationWindow = materializationWindow; } + public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime, + Date endTime) { + super("coord_mater", "coord_mater", 1); + this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId"); + this.materializationWindow = materializationWindow; + this.coordJob = coordJob; + this.startMatdTime = startTime; + this.endMatdTime = endTime; + } + /* (non-Javadoc) * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext() */ @@ -412,7 +422,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo } String action = null; - int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); + int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated boolean ignoreMaxActions = http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java index 11fde6f..02b30ef 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java @@ -56,6 +56,7 @@ import org.apache.oozie.coord.CoordinatorJobException; import org.apache.oozie.coord.TimeUnit; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.CoordMaterializeTriggerService; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; @@ -287,14 +288,16 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { /** * Gets the dryrun output. * - * @param jobId the job id + * @param coordJob the coordinatorJobBean * @return the dry run * @throws Exception the exception */ protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{ + int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW, + CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT); Date startTime = coordJob.getStartTime(); long startTimeMilli = startTime.getTime(); - long endTimeMilli = startTimeMilli + (3600 * 1000); + long endTimeMilli = startTimeMilli + (materializationWindow * 1000); Date jobEndTime = coordJob.getEndTime(); Date endTime = new Date(endTimeMilli); if (endTime.compareTo(jobEndTime) > 0) { @@ -304,8 +307,6 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { LOG.info("[" + jobId + "]: Update status to RUNNING"); coordJob.setStatus(Job.Status.RUNNING); coordJob.setPending(); - CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, - endTime); Configuration jobConf = null; try { jobConf = new XConfiguration(new StringReader(coordJob.getConf())); @@ -313,7 +314,8 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { catch (IOException e1) { LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); } - String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); + String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime, + endTime).materializeActions(true); String output = coordJob.getJobXml() + System.getProperty("line.separator") + "***actions for instance***" + action; return output; @@ -323,9 +325,9 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { * Queue MaterializeTransitionXCommand */ protected void queueMaterializeTransitionXCommand(String jobId) { - // submit a command to materialize jobs for the next 1 hour (3600 secs) - // so we don't wait 10 mins for the Service to run. - queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); + int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW, + CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT); + queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java deleted file mode 100644 index c70e171..0000000 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command.coord; - -import org.apache.oozie.command.Command; -import org.apache.oozie.store.CoordinatorStore; -import org.apache.oozie.store.Store; -import org.apache.oozie.store.WorkflowStore; - -public abstract class CoordinatorCommand<T> extends Command<T, CoordinatorStore> { - - public CoordinatorCommand(String name, String type, int priority, int logMask) { - super(name, type, priority, logMask); - } - - public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore) { - super(name, type, priority, logMask, withStore); - } - - public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) { - super(name, type, priority, logMask, (dryrun) ? false : withStore, dryrun); - } - - /** - * Return the public interface of the Coordinator Store. - * - * @return {@link WorkflowStore} - */ - public Class<? extends Store> getStoreClass() { - return CoordinatorStore.class; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java index 1dac7e8..3fbd092 100644 --- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java +++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java @@ -61,7 +61,7 @@ public class CoordMaterializeTriggerService implements Service { private static final String INSTRUMENTATION_GROUP = "coord_job_mat"; private static final String INSTR_MAT_JOBS_COUNTER = "jobs"; public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300; - private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; + public static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600; private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50; /** http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/TestCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/TestCommand.java b/core/src/test/java/org/apache/oozie/command/TestCommand.java deleted file mode 100644 index 60363bf..0000000 --- a/core/src/test/java/org/apache/oozie/command/TestCommand.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command; - -import org.apache.oozie.store.StoreException; -import org.apache.oozie.store.WorkflowStore; -import org.apache.oozie.store.Store; -import org.apache.oozie.service.DagXLogInfoService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XTestCase; -import org.apache.oozie.util.XCallable; -import org.apache.oozie.util.XLog; -import org.apache.oozie.ErrorCode; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -public class TestCommand extends XTestCase { - private static List<String> EXECUTED = Collections.synchronizedList(new ArrayList<String>()); - - @Override - protected void setUp() throws Exception { - super.setUp(); - new Services().init(); - } - - @Override - protected void tearDown() throws Exception { - Services.get().destroy(); - super.tearDown(); - } - - private static class DummyXCallable implements XCallable<Void> { - private String name; - private String key = null; - - public DummyXCallable(String name) { - this.name = name; - this.key = name + "_" + UUID.randomUUID(); - } - - @Override - public String getName() { - return name; - } - - @Override - public String getType() { - return "type"; - } - - @Override - public int getPriority() { - return 0; - } - - @Override - public long getCreatedTime() { - return 1; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Type:").append(getType()); - sb.append(",Priority:").append(getPriority()); - return sb.toString(); - } - - @Override - public void setInterruptMode(boolean mode) { - } - - @Override - public boolean inInterruptMode() { - return false; - } - - public Void call() throws Exception { - EXECUTED.add(name); - return null; - } - - @Override - public String getKey() { - return this.key; - } - - @Override - public String getEntityKey() { - return null; - } - - } - - private static class MyCommand extends Command<Object, WorkflowStore> { - private boolean exception; - - public MyCommand(boolean exception) { - super("test", "test", 1, XLog.OPS); - this.exception = exception; - } - - @Override - protected Object call(WorkflowStore store) throws StoreException, CommandException { - assertTrue(logInfo.createPrefix().contains("JOB[job]")); - assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]")); - assertTrue(logInfo.createPrefix().contains("ACTION[action]")); - assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]")); - assertNotNull(store); - assertEquals("test", getName()); - assertEquals(1, getPriority()); - queueCallable(new DummyXCallable("a")); - queueCallable(Arrays.asList(new DummyXCallable("b"), new DummyXCallable("c"))); - queueCallable(new DummyXCallable("d"), 300); - queueCallable(new DummyXCallable("e"), 200); - queueCallable(new DummyXCallable("f"), 100); - queueCallableForException(new DummyXCallable("ex")); - if (exception) { - throw new CommandException(ErrorCode.E0800); - } - return null; - } - - /** - * Return the public interface of the Workflow Store. - * - * @return {@link WorkflowStore} - */ - @Override - public Class<? extends Store> getStoreClass() { - return WorkflowStore.class; - } - } - - public void testDagCommand() throws Exception { - XLog.Info.get().clear(); - XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job"); - XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action"); - - Command command = new MyCommand(false); - - XLog.Info.get().clear(); - command.call(); - - assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]")); - assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]")); - command.resetLogInfoWorkflow(); - assertTrue(XLog.Info.get().createPrefix().contains("JOB[-]")); - assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]")); - command.resetLogInfoAction(); - assertTrue(XLog.Info.get().createPrefix().contains("ACTION[-]")); - - waitFor(2000, new Predicate() { - public boolean evaluate() throws Exception { - return EXECUTED.size() == 6; - } - }); - - assertEquals(6, EXECUTED.size()); - assertEquals(Arrays.asList("a", "b", "c", "d", "e", "f"), EXECUTED); - - EXECUTED.clear(); - - XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job"); - XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action"); - command = new MyCommand(true); - - try { - command.call(); - fail(); - } - catch (CommandException ex) { - //nop - } - - waitFor(200, new Predicate() { - public boolean evaluate() throws Exception { - return EXECUTED.size() == 2; - } - }); - - assertEquals(1, EXECUTED.size()); - assertEquals(Arrays.asList("ex"), EXECUTED); - } - - /** - * Return the public interface of the Workflow Store. - * - * @return {@link WorkflowStore} - */ - public Class<? extends Store> getStoreClass() { - return WorkflowStore.class; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java deleted file mode 100644 index 6278515..0000000 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java +++ /dev/null @@ -1,323 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.oozie.command.coord; - -import java.util.Date; -import java.util.List; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.SLAEventBean; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.CoordinatorJob.Timeunit; -import org.apache.oozie.service.Services; -import org.apache.oozie.store.CoordinatorStore; -import org.apache.oozie.store.SLAStore; -import org.apache.oozie.store.StoreException; -import org.apache.oozie.test.XTestCase; -import org.apache.oozie.util.DateUtils; - -@SuppressWarnings("deprecation") -public class TestCoordActionMaterializeCommand extends XTestCase { - private Services services; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - public void testActionMater() throws Exception { - String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C"; - String startTimeStr = "2009-03-06T10:00Z"; - Date startTime = DateUtils.parseDateOozieTZ(startTimeStr); - Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z"); - addRecordToJobTable(jobId, startTime, endTime); - new CoordActionMaterializeCommand(jobId, startTime, endTime).call(); - CoordinatorActionBean action = checkCoordAction(jobId + "@1"); - assertEquals(action.getActionNumber(), 1); - assertEquals(action.getNominalTime(), startTime); - assertTrue(action.getMissingDependencies().indexOf("file:///tmp/coord/workflows/2009/03/01") > -1); - String actionXml = action.getActionXml(); - String slaNotifMsg = actionXml.substring(actionXml.indexOf("<sla:notification-msg>") + 22, - actionXml.indexOf("</sla:notification-msg>")); - String expectedSlaMsg = "Notifying User for 2009-03-06T10:00Z," - + DateUtils.formatDateOozieTZ(action.getCreatedTime()) + ",2009-03-06,2009-03-07T10:00Z," - + action.getId() + ",NAME,testValue,testUser," - + "file:///tmp/coord/workflows/2009/22myOutputDatabase,myOutputTable,'datestamp=20090306'"; - assertEquals(expectedSlaMsg, slaNotifMsg); - } - - public void testActionMaterWithPauseTime1() throws Exception { - String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C"; - - Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"); - Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z"); - Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z"); - addRecordToJobTable(jobId, startTime, endTime, pauseTime); - new CoordActionMaterializeCommand(jobId, startTime, endTime).call(); - checkCoordActions(jobId, 1, null); - } - - public void testActionMaterWithPauseTime2() throws Exception { - String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C"; - - Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"); - Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z"); - Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z"); - addRecordToJobTable(jobId, startTime, endTime, pauseTime); - new CoordActionMaterializeCommand(jobId, startTime, endTime).call(); - checkCoordActions(jobId, 2, null); - } - - public void testActionMaterWithPauseTime3() throws Exception { - String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C"; - - Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"); - Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z"); - Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z"); - addRecordToJobTable(jobId, startTime, endTime, pauseTime); - new CoordActionMaterializeCommand(jobId, startTime, endTime).call(); - checkCoordActions(jobId, 0, CoordinatorJob.Status.RUNNING); - } - - private void addRecordToJobTable(String jobId, Date startTime, Date endTime) throws StoreException { - CoordinatorStore store = new CoordinatorStore(false); - CoordinatorJobBean coordJob = new CoordinatorJobBean(); - coordJob.setId(jobId); - coordJob.setAppName("testApp"); - coordJob.setStartTime(startTime); - coordJob.setEndTime(endTime); - coordJob.setTimeUnit(Timeunit.DAY); - coordJob.setAppPath("testAppPath"); - coordJob.setStatus(CoordinatorJob.Status.PREMATER); - coordJob.setCreatedTime(new Date()); // TODO: Do we need that? - coordJob.setLastModifiedTime(new Date()); - coordJob.setUser("testUser"); - coordJob.setGroup("testGroup"); - coordJob.setTimeZone("America/Los_Angeles"); - String confStr = "<configuration><property><name>testProperty</name><value>testValue</value></property>" - + "<property><name>user.name</name><value>testUser</value></property></configuration>"; - coordJob.setConf(confStr); - String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"1\" start='2009-03-06T010:00Z' end='2009-03-11T10:00Z' timezone='America/Los_Angeles' freq_timeunit='DAY' end_of_duration='NONE'>"; - appXml += "<controls>"; - appXml += "<timeout>10</timeout>"; - appXml += "<concurrency>2</concurrency>"; - appXml += "<execution>LIFO</execution>"; - appXml += "</controls>"; - appXml += "<input-events>"; - appXml += "<data-in name='A' dataset='a'>"; - appXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>"; - appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${MONTH}/${DAY}</uri-template>"; - appXml += "</dataset>"; - appXml += "<instance>${coord:current(0)}</instance>"; - appXml += "<instance>${coord:latest(-1)}</instance>"; - appXml += "</data-in>"; - appXml += "</input-events>"; - appXml += "<output-events>"; - appXml += "<data-out name='LOCAL_A' dataset='local_a'>"; - appXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>"; - appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>"; - appXml += "</dataset>"; - appXml += "<instance>${coord:current(-1)}</instance>"; - appXml += "</data-out>"; - appXml += "<data-out name='aggregated-logs' dataset='Stats'>"; - appXml += "<dataset name='Stats' frequency='1' initial-instance='2009-01-01T01:00Z' "; - appXml += "timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>"; - appXml += "<uri-template>hcat://foo:11002/myOutputDatabase/myOutputTable/datestamp=${YEAR}${MONTH}${DAY}"; - appXml += "</uri-template></dataset>"; - appXml += "<instance>${coord:current(0)}</instance>"; - appXml += "</data-out>"; - appXml += "</output-events>"; - appXml += "<action>"; - appXml += "<workflow>"; - appXml += "<app-path>hdfs:///tmp/workflows/</app-path>"; - appXml += "<configuration>"; - appXml += "<property>"; - appXml += "<name>inputA</name>"; - appXml += "<value>${coord:dataIn('A')}</value>"; - appXml += "</property>"; - appXml += "<property>"; - appXml += "<name>inputB</name>"; - appXml += "<value>${coord:dataOut('LOCAL_A')}</value>"; - appXml += "</property>"; - appXml += "</configuration>"; - appXml += "</workflow>"; - appXml += " <sla:info>" - + " <sla:app-name>test-app</sla:app-name>" - + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>" - + " <sla:should-start>5</sla:should-start>" - + " <sla:should-end>120</sla:should-end>" - + " <sla:notification-msg>Notifying User for ${coord:nominalTime()},${coord:actualTime()}," - + "${coord:formatTime(coord:nominalTime(),'yyyy-MM-dd')},${coord:dateOffset(coord:nominalTime(), 1, 'DAY')}," - + "${coord:actionId()},${coord:name()},${coord:conf('testProperty')},${coord:user()},${coord:dataOut('LOCAL_A')}" - + "${coord:databaseOut('aggregated-logs')},${coord:tableOut('aggregated-logs')}," - + "${coord:dataOutPartitions('aggregated-logs')}" - + "</sla:notification-msg>" + " <sla:alert-contact>[email protected]</sla:alert-contact>" - + " <sla:dev-contact>[email protected]</sla:dev-contact>" - + " <sla:qa-contact>[email protected]</sla:qa-contact>" - + " <sla:se-contact>[email protected]</sla:se-contact>" + "</sla:info>"; - appXml += "</action>"; - appXml += "</coordinator-app>"; - /*try { - System.out.println(XmlUtils.prettyPrint(XmlUtils.parseXml(appXml))); - ; - } - catch (JDOMException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - }*/ - coordJob.setJobXml(appXml); - coordJob.setLastActionNumber(0); - coordJob.setFrequency("1"); - try { - coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-03-11T10:00Z")); - } - catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - fail("Could not set end time"); - } - try { - store.beginTrx(); - store.insertCoordinatorJob(coordJob); - store.commitTrx(); - } - catch (StoreException se) { - se.printStackTrace(); - store.rollbackTrx(); - fail("Unable to insert the test job record to table"); - throw se; - } - finally { - store.closeTrx(); - } - } - - private CoordinatorActionBean checkCoordAction(String actionId) throws StoreException { - CoordinatorStore store = new CoordinatorStore(false); - try { - CoordinatorActionBean action = store.getCoordinatorAction(actionId, false); - SLAStore slaStore = new SLAStore(store); - long lastSeqId[] = new long[1]; - List<SLAEventBean> slaEvents = slaStore.getSLAEventListNewerSeqLimited(0, 10, lastSeqId); - // System.out.println("AAA " + slaEvents.size() + " : " + - // lastSeqId[0]); - if (slaEvents.size() == 0) { - fail("Unable to GET any record of sequence id greater than 0"); - } - return action; - } - catch (StoreException se) { - se.printStackTrace(); - fail("Action ID " + actionId + " was not stored properly in db"); - } - return null; - } - - private void addRecordToJobTable(String jobId, Date startTime, Date endTime, Date pauseTime) throws StoreException { - CoordinatorStore store = new CoordinatorStore(false); - CoordinatorJobBean coordJob = new CoordinatorJobBean(); - coordJob.setId(jobId); - coordJob.setAppName("testApp"); - coordJob.setStartTime(startTime); - coordJob.setEndTime(endTime); - coordJob.setPauseTime(pauseTime); - coordJob.setTimeUnit(Timeunit.MINUTE); - coordJob.setAppPath("testAppPath"); - coordJob.setStatus(CoordinatorJob.Status.PREMATER); - coordJob.setCreatedTime(new Date()); // TODO: Do we need that? - coordJob.setLastModifiedTime(new Date()); - coordJob.setUser("testUser"); - coordJob.setGroup("testGroup"); - coordJob.setTimeZone("America/Los_Angeles"); - String confStr = "<configuration></configuration>"; - coordJob.setConf(confStr); - String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"5\" start='2009-03-06T010:00Z' end='2009-03-06T10:14Z' timezone='America/Los_Angeles' freq_timeunit='MINUTE' end_of_duration='NONE'>"; - appXml += "<controls>"; - appXml += "<timeout>10</timeout>"; - appXml += "<concurrency>2</concurrency>"; - appXml += "<execution>LIFO</execution>"; - appXml += "</controls>"; - appXml += "<action>"; - appXml += "<workflow>"; - appXml += "<app-path>hdfs:///tmp/workflows/</app-path>"; - appXml += "<configuration>"; - appXml += "</configuration>"; - appXml += "</workflow>"; - appXml += " <sla:info>" - // + " <sla:client-id>axonite-blue</sla:client-id>" - + " <sla:app-name>test-app</sla:app-name>" - + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>" - + " <sla:should-start>5</sla:should-start>" - + " <sla:should-end>120</sla:should-end>" - + " <sla:notification-msg>Notifying User for ${coord:nominalTime()} nominal time </sla:notification-msg>" - + " <sla:alert-contact>[email protected]</sla:alert-contact>" - + " <sla:dev-contact>[email protected]</sla:dev-contact>" - + " <sla:qa-contact>[email protected]</sla:qa-contact>" + " <sla:se-contact>[email protected]</sla:se-contact>" - + "</sla:info>"; - appXml += "</action>"; - appXml += "</coordinator-app>"; - - coordJob.setJobXml(appXml); - coordJob.setLastActionNumber(0); - coordJob.setFrequency("5"); - try { - store.beginTrx(); - store.insertCoordinatorJob(coordJob); - store.commitTrx(); - } - catch (StoreException se) { - se.printStackTrace(); - store.rollbackTrx(); - fail("Unable to insert the test job record to table"); - throw se; - } - finally { - store.closeTrx(); - } - } - - private void checkCoordActions(String jobId, int number, CoordinatorJob.Status status) throws StoreException { - CoordinatorStore store = new CoordinatorStore(false); - try { - int coordActionsCount = store.getActionsForCoordinatorJob(jobId, false); - if (coordActionsCount != number) { - fail("Should have " + number + " actions created for job " + jobId); - } - - if (status != null) { - CoordinatorJob job = store.getCoordinatorJob(jobId, false); - if (job.getStatus() != status) { - fail("Job status " + job.getStatus() + " should be " + status); - } - } - } - catch (StoreException se) { - se.printStackTrace(); - fail("Job ID " + jobId + " was not stored properly in db"); - } - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java index 7f7a387..aab8efd 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java @@ -55,10 +55,10 @@ public class TestCoordELExtensions extends XDataTestCase { Date startTime = DateUtils.parseDateUTC("2009-03-06T010:00Z"); Date endTime = DateUtils.parseDateUTC("2009-03-07T12:00Z"); CoordinatorJobBean job = createCoordJob("coord-job-for-elext.xml", - CoordinatorJob.Status.PREMATER, startTime, endTime, false, false, 0); + CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0); addRecordToCoordJobTable(job); - new CoordActionMaterializeCommand(job.getId(), startTime, endTime).call(); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); checkCoordAction(job.getId() + "@1"); } http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java index 5b22abc..60eff2f 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java @@ -31,7 +31,6 @@ import org.apache.oozie.client.CoordinatorJob.Timeunit; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELFunctions; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor; @@ -382,6 +381,21 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { checkCoordActions(job.getId(), 0, CoordinatorJob.Status.PAUSED); } + public void testGetDryrun() throws Exception { + Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"); + Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z"); + CoordinatorJobBean job = createCoordJob(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0); + job.setFrequency("5"); + job.setTimeUnit(Timeunit.MINUTE); + job.setMatThrottling(20); + String dryRunOutput = new CoordMaterializeTransitionXCommand(job, 3600, startTime, endTime).materializeActions(true); + String[] actions = dryRunOutput.split("action for new instance"); + assertEquals(3, actions.length -1); + for(int i = 1; i < actions.length; i++) { + assertTrue(actions[i].contains("action-nominal-time")); + } + } + public void testTimeout() throws Exception { Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"); Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z"); http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 190bad7..be5e84a 100644 --- a/pom.xml +++ b/pom.xml @@ -836,7 +836,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <version>2.12</version> + <version>2.12.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9966933..6ec2bcb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,4 +1,5 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags) OOZIE-1925 upgrade tomcat to 6.0.41 (rkanter via shwethags) OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)
