Repository: oozie
Updated Branches:
  refs/heads/master 70a5ffe4b -> 70052969a


OOZIE-2394 Oozie can execute command without holding lock


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70052969
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70052969
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70052969

Branch: refs/heads/master
Commit: 70052969a1df064957bc5dd06d69fc1955401e1b
Parents: 70a5ffe
Author: Purshotam Shah <[email protected]>
Authored: Tue Jan 26 10:43:33 2016 -0800
Committer: Purshotam Shah <[email protected]>
Committed: Tue Jan 26 10:43:33 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/oozie/command/XCommand.java | 58 +++++++-------------
 .../coord/CoordActionInputCheckXCommand.java    |  2 +-
 .../command/coord/CoordActionReadyXCommand.java |  2 +-
 .../oozie/command/wf/ActionCheckXCommand.java   |  2 +-
 .../oozie/command/wf/ActionEndXCommand.java     |  2 +-
 .../oozie/command/wf/ActionStartXCommand.java   |  2 +-
 .../apache/oozie/command/wf/SignalXCommand.java |  2 +-
 release-log.txt                                 |  1 +
 8 files changed, 26 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java 
b/core/src/main/java/org/apache/oozie/command/XCommand.java
index ff87510..bdf13f6 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -74,7 +74,6 @@ public abstract class XCommand<T> implements XCallable<T> {
     private LockToken lock;
     private AtomicBoolean used = new AtomicBoolean(false);
     private boolean inInterrupt = false;
-    private boolean isSynchronous = false;
 
     private Map<Long, List<XCommand<?>>> commandQueue;
     protected boolean dryrun = false;
@@ -212,13 +211,16 @@ public abstract class XCommand<T> implements XCallable<T> 
{
         if (lock == null) {
             instrumentation.incr(INSTRUMENTATION_GROUP, getName() + 
".lockTimeOut", 1);
             if (isReQueueRequired()) {
-                //if not acquire the lock, re-queue itself with default delay
+                // if not acquire the lock, re-queue itself with default delay
                 queue(this, getRequeueDelay());
-                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and 
requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
-            } else {
+                LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and 
requeue itself [{2}]", this.toString(),
+                        getLockTimeOut(), getName());
+            }
+            else {
                 throw new CommandException(ErrorCode.E0606, this.toString(), 
getLockTimeOut());
             }
-        } else {
+        }
+        else {
             LOG.debug("Acquired lock for [{0}] in [{1}]", getEntityKey(), 
getName());
         }
     }
@@ -252,13 +254,11 @@ public abstract class XCommand<T> implements XCallable<T> 
{
         Instrumentation.Cron callCron = new Instrumentation.Cron();
         try {
             callCron.start();
-            if (!isSynchronous) {
-                eagerLoadState();
-                eagerVerifyPrecondition();
-            }
+            eagerLoadState();
+            eagerVerifyPrecondition();
             try {
                 T ret = null;
-                if (!isSynchronous && isLockRequired() && 
!this.inInterruptMode()) {
+                if (isLockRequired() && !this.inInterruptMode()) {
                     Instrumentation.Cron acquireLockCron = new 
Instrumentation.Cron();
                     acquireLockCron.start();
                     acquireLock();
@@ -270,10 +270,11 @@ public abstract class XCommand<T> implements XCallable<T> 
{
                     this.executeInterrupts();
                 }
 
-                if (isSynchronous || !isLockRequired() || (lock != null) || 
this.inInterruptMode()) {
+                if (!isLockRequired() || (lock != null) || 
this.inInterruptMode()) {
                     if 
(CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
                             && !used.compareAndSet(false, true)) {
-                        LOG.debug("Command [{0}] key [{1}]  already executed 
for [{2}]", getName(), getEntityKey(), this.toString());
+                        LOG.debug("Command [{0}] key [{1}]  already executed 
for [{2}]", getName(), getEntityKey(),
+                                this.toString());
                         return null;
                     }
                     LOG.trace("Load state for [{0}]", getEntityKey());
@@ -300,12 +301,12 @@ public abstract class XCommand<T> implements XCallable<T> 
{
                 return ret;
             }
             finally {
-                if (!isSynchronous && isLockRequired() && 
!this.inInterruptMode()) {
+                if (isLockRequired() && !this.inInterruptMode()) {
                     releaseLock();
                 }
             }
         }
-        catch(PreconditionException pex){
+        catch (PreconditionException pex) {
             LOG.warn(pex.getMessage().toString() + ", Error Code: " + 
pex.getErrorCode().toString());
             instrumentation.incr(INSTRUMENTATION_GROUP, getName() + 
".preconditionfailed", 1);
             return null;
@@ -338,25 +339,6 @@ public abstract class XCommand<T> implements XCallable<T> {
     }
 
     /**
-     * Call this command synchronously from its caller. This benefits faster
-     * execution of command lifecycle for control nodes and kicking off
-     * subsequent actions
-     *
-     * @param callerEntityKey
-     * @return the {link #execute} return value.
-     * @throws CommandException
-     */
-    public final T call(String callerEntityKey) throws CommandException {
-        if (!callerEntityKey.equals(this.getEntityKey())) {
-            throw new CommandException(ErrorCode.E0607, "Entity Keys mismatch 
during synchronous call", "caller="
-                    + callerEntityKey + ", callee=" + getEntityKey());
-        }
-        isSynchronous = true; //setting to true so lock acquiring and release 
is not repeated
-        LOG.trace("Executing synchronously command [{0}] on job [{1}]", 
this.getName(), this.getKey());
-        return call();
-    }
-
-    /**
      * Check for the existence of interrupts for the same lock key
      * Execute them if exist.
      *
@@ -441,7 +423,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      * <p>
      * A trivial implementation is calling {link #loadState}.
      */
-    protected void eagerLoadState() throws CommandException{
+    protected void eagerLoadState() throws CommandException {
     }
 
     /**
@@ -453,7 +435,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      *
      * @throws CommandException thrown if the precondition is not met.
      */
-    protected void eagerVerifyPrecondition() throws 
CommandException,PreconditionException {
+    protected void eagerVerifyPrecondition() throws CommandException, 
PreconditionException {
     }
 
     /**
@@ -470,7 +452,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      *
      * @throws CommandException thrown if the precondition is not met.
      */
-    protected abstract void verifyPrecondition() throws 
CommandException,PreconditionException;
+    protected abstract void verifyPrecondition() throws CommandException, 
PreconditionException;
 
     /**
      * Command execution body.
@@ -485,7 +467,6 @@ public abstract class XCommand<T> implements XCallable<T> {
      */
     protected abstract T execute() throws CommandException;
 
-
     /**
      * Return the {@link Instrumentation} instance in use.
      *
@@ -502,7 +483,6 @@ public abstract class XCommand<T> implements XCallable<T> {
         this.used.set(false);
     }
 
-
     /**
      * Return the delay time for requeue
      * <p>
@@ -522,7 +502,7 @@ public abstract class XCommand<T> implements XCallable<T> {
      * @return command key
      */
     @Override
-    public String getKey(){
+    public String getKey() {
         return this.key;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 742d00d..11184d1 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -197,7 +197,7 @@ public class CoordActionInputCheckXCommand extends 
CoordinatorXCommand<Void> {
                 coordAction.setActionXml(actionXml.toString());
                 coordAction.setStatus(CoordinatorAction.Status.READY);
                 updateCoordAction(coordAction, true);
-                new 
CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey());
+                new CoordActionReadyXCommand(coordAction.getJobId()).call();
             }
             else if (!isTimeout(currentTime)) {
                 if (status == false) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
index 2e9c5ea..2d8af04 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
@@ -182,7 +182,7 @@ public class CoordActionReadyXCommand extends 
CoordinatorXCommand<Void> {
                 }
                 // start action
                 new CoordActionStartXCommand(action.getId(), 
coordJob.getUser(), coordJob.getAppName(),
-                        action.getJobId()).call(getEntityKey());
+                        action.getJobId()).call();
             }
             else {
                 break;

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 5827387..ea4d340 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -241,7 +241,7 @@ public class ActionCheckXCommand extends 
ActionXCommand<Void> {
                     generateEvent(wfAction, wfJob.getUser());
                 }
                 if (execSynchronous) {
-                    new ActionEndXCommand(wfAction.getId(), 
wfAction.getType()).call(getEntityKey());
+                    new ActionEndXCommand(wfAction.getId(), 
wfAction.getType()).call();
                 }
             }
             catch (JPAExecutorException e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
index 4006441..d030a10 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -277,7 +277,7 @@ public class ActionEndXCommand extends ActionXCommand<Void> 
{
             if (!(executor instanceof ControlNodeActionExecutor) && 
EventHandlerService.isEnabled()) {
                 generateEvent(wfAction, wfJob.getUser());
             }
-            new SignalXCommand(jobId, actionId).call(getEntityKey());
+            new SignalXCommand(jobId, actionId).call();
         }
 
         LOG.debug("ENDED ActionEndXCommand for action " + actionId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 85a6cd7..2939b60 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -353,7 +353,7 @@ public class ActionStartXCommand extends 
ActionXCommand<org.apache.oozie.command
     }
 
     protected void callActionEnd() throws CommandException {
-        new ActionEndXCommand(wfAction.getId(), 
wfAction.getType()).call(getEntityKey());
+        new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
     }
 
     protected void updateJobLastModified(){

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 6f64647..d2bb403 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -456,7 +456,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
             new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
         }
         else if (syncAction != null) {
-            new ActionStartXCommand(wfJob, syncAction.getId(), 
syncAction.getType()).call(getEntityKey());
+            new ActionStartXCommand(wfJob, syncAction.getId(), 
syncAction.getType()).call();
         }
         else if (!workflowActionBeanListForForked.isEmpty() && 
!checkForSuspendNode(workflowActionBeanListForForked)){
             startForkedActions(workflowActionBeanListForForked);

http://git-wip-us.apache.org/repos/asf/oozie/blob/70052969/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6dac28b..1525ae4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2394 Oozie can execute command without holding lock (puru)
 OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same 
thread and released (puru)
 OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)
 OOZIE-2434 inconsistent coord action status and workflow job status 
(satishsaley via puru)

Reply via email to