Repository: oozie Updated Branches: refs/heads/master 70a5ffe4b -> 70052969a
OOZIE-2394 Oozie can execute command without holding lock Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70052969 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70052969 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70052969 Branch: refs/heads/master Commit: 70052969a1df064957bc5dd06d69fc1955401e1b Parents: 70a5ffe Author: Purshotam Shah <[email protected]> Authored: Tue Jan 26 10:43:33 2016 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Jan 26 10:43:33 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/oozie/command/XCommand.java | 58 +++++++------------- .../coord/CoordActionInputCheckXCommand.java | 2 +- .../command/coord/CoordActionReadyXCommand.java | 2 +- .../oozie/command/wf/ActionCheckXCommand.java | 2 +- .../oozie/command/wf/ActionEndXCommand.java | 2 +- .../oozie/command/wf/ActionStartXCommand.java | 2 +- .../apache/oozie/command/wf/SignalXCommand.java | 2 +- release-log.txt | 1 + 8 files changed, 26 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/XCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java index ff87510..bdf13f6 100644 --- a/core/src/main/java/org/apache/oozie/command/XCommand.java +++ b/core/src/main/java/org/apache/oozie/command/XCommand.java @@ -74,7 +74,6 @@ public abstract class XCommand<T> implements XCallable<T> { private LockToken lock; private AtomicBoolean used = new AtomicBoolean(false); private boolean inInterrupt = false; - private boolean isSynchronous = false; private Map<Long, List<XCommand<?>>> commandQueue; protected boolean dryrun = false; @@ -212,13 +211,16 @@ public abstract class XCommand<T> implements XCallable<T> { if (lock == null) { instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".lockTimeOut", 1); if (isReQueueRequired()) { - //if not acquire the lock, re-queue itself with default delay + // if not acquire the lock, re-queue itself with default delay queue(this, getRequeueDelay()); - LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), getLockTimeOut(), getName()); - } else { + LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and requeue itself [{2}]", this.toString(), + getLockTimeOut(), getName()); + } + else { throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); } - } else { + } + else { LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), getName()); } } @@ -252,13 +254,11 @@ public abstract class XCommand<T> implements XCallable<T> { Instrumentation.Cron callCron = new Instrumentation.Cron(); try { callCron.start(); - if (!isSynchronous) { - eagerLoadState(); - eagerVerifyPrecondition(); - } + eagerLoadState(); + eagerVerifyPrecondition(); try { T ret = null; - if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { + if (isLockRequired() && !this.inInterruptMode()) { Instrumentation.Cron acquireLockCron = new Instrumentation.Cron(); acquireLockCron.start(); acquireLock(); @@ -270,10 +270,11 @@ public abstract class XCommand<T> implements XCallable<T> { this.executeInterrupts(); } - if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) { + if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && !used.compareAndSet(false, true)) { - LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); + LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), + this.toString()); return null; } LOG.trace("Load state for [{0}]", getEntityKey()); @@ -300,12 +301,12 @@ public abstract class XCommand<T> implements XCallable<T> { return ret; } finally { - if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { + if (isLockRequired() && !this.inInterruptMode()) { releaseLock(); } } } - catch(PreconditionException pex){ + catch (PreconditionException pex) { LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString()); instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1); return null; @@ -338,25 +339,6 @@ public abstract class XCommand<T> implements XCallable<T> { } /** - * Call this command synchronously from its caller. This benefits faster - * execution of command lifecycle for control nodes and kicking off - * subsequent actions - * - * @param callerEntityKey - * @return the {link #execute} return value. - * @throws CommandException - */ - public final T call(String callerEntityKey) throws CommandException { - if (!callerEntityKey.equals(this.getEntityKey())) { - throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch during synchronous call", "caller=" - + callerEntityKey + ", callee=" + getEntityKey()); - } - isSynchronous = true; //setting to true so lock acquiring and release is not repeated - LOG.trace("Executing synchronously command [{0}] on job [{1}]", this.getName(), this.getKey()); - return call(); - } - - /** * Check for the existence of interrupts for the same lock key * Execute them if exist. * @@ -441,7 +423,7 @@ public abstract class XCommand<T> implements XCallable<T> { * <p> * A trivial implementation is calling {link #loadState}. */ - protected void eagerLoadState() throws CommandException{ + protected void eagerLoadState() throws CommandException { } /** @@ -453,7 +435,7 @@ public abstract class XCommand<T> implements XCallable<T> { * * @throws CommandException thrown if the precondition is not met. */ - protected void eagerVerifyPrecondition() throws CommandException,PreconditionException { + protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { } /** @@ -470,7 +452,7 @@ public abstract class XCommand<T> implements XCallable<T> { * * @throws CommandException thrown if the precondition is not met. */ - protected abstract void verifyPrecondition() throws CommandException,PreconditionException; + protected abstract void verifyPrecondition() throws CommandException, PreconditionException; /** * Command execution body. @@ -485,7 +467,6 @@ public abstract class XCommand<T> implements XCallable<T> { */ protected abstract T execute() throws CommandException; - /** * Return the {@link Instrumentation} instance in use. * @@ -502,7 +483,6 @@ public abstract class XCommand<T> implements XCallable<T> { this.used.set(false); } - /** * Return the delay time for requeue * <p> @@ -522,7 +502,7 @@ public abstract class XCommand<T> implements XCallable<T> { * @return command key */ @Override - public String getKey(){ + public String getKey() { return this.key; } http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java index 742d00d..11184d1 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java @@ -197,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { coordAction.setActionXml(actionXml.toString()); coordAction.setStatus(CoordinatorAction.Status.READY); updateCoordAction(coordAction, true); - new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey()); + new CoordActionReadyXCommand(coordAction.getJobId()).call(); } else if (!isTimeout(currentTime)) { if (status == false) { http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java index 2e9c5ea..2d8af04 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java @@ -182,7 +182,7 @@ public class CoordActionReadyXCommand extends CoordinatorXCommand<Void> { } // start action new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(), - action.getJobId()).call(getEntityKey()); + action.getJobId()).call(); } else { break; http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java index 5827387..ea4d340 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java @@ -241,7 +241,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> { generateEvent(wfAction, wfJob.getUser()); } if (execSynchronous) { - new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); + new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); } } catch (JPAExecutorException e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java index 4006441..d030a10 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java @@ -277,7 +277,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> { if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { generateEvent(wfAction, wfJob.getUser()); } - new SignalXCommand(jobId, actionId).call(getEntityKey()); + new SignalXCommand(jobId, actionId).call(); } LOG.debug("ENDED ActionEndXCommand for action " + actionId); http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index 85a6cd7..2939b60 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -353,7 +353,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command } protected void callActionEnd() throws CommandException { - new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(getEntityKey()); + new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); } protected void updateJobLastModified(){ http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index 6f64647..d2bb403 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -456,7 +456,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { new WfEndXCommand(wfJob).call(); // To delete the WF temp dir } else if (syncAction != null) { - new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(getEntityKey()); + new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(); } else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){ startForkedActions(workflowActionBeanListForForked); http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6dac28b..1525ae4 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2394 Oozie can execute command without holding lock (puru) OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru) OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter) OOZIE-2434 inconsistent coord action status and workflow job status (satishsaley via puru)
