Updated Branches:
  refs/heads/master 55c15b2f6 -> b6d7942cd

OOZIE-1687 Bundle can still be in RUNNINGWITHERROR status after bundle kill 
(rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b6d7942c
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b6d7942c
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b6d7942c

Branch: refs/heads/master
Commit: b6d7942cdaa56fa57c8ba70d7388ae8716f7355a
Parents: 55c15b2
Author: Rohini Palaniswamy <[email protected]>
Authored: Thu Feb 6 11:22:45 2014 -0800
Committer: Rohini Palaniswamy <[email protected]>
Committed: Thu Feb 6 11:22:45 2014 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    | 20 ++++++++-
 .../command/bundle/BundleKillXCommand.java      | 47 +++++++++-----------
 .../bundle/BundleStatusUpdateXCommand.java      | 35 +++++++--------
 .../command/bundle/TestBundleKillXCommand.java  | 47 +++++++++++++++++++-
 release-log.txt                                 |  1 +
 5 files changed, 101 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 16209ce..95f6991 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -625,10 +625,28 @@ public class CoordinatorJobBean implements Writable, 
CoordinatorJob, JsonBean {
         setAppNamespace(WritableUtils.readStr(dataInput));
     }
 
+    /**
+     * @return true if in terminal status
+     */
+    public boolean isTerminalStatus() {
+        boolean isTerminal = false;
+        switch (getStatus()) {
+            case SUCCEEDED:
+            case FAILED:
+            case KILLED:
+            case DONEWITHERROR:
+                isTerminal = true;
+                break;
+            default:
+                isTerminal = false;
+                break;
+        }
+        return isTerminal;
+    }
+
     @Override
     public Status getStatus() {
         return Status.valueOf(this.statusStr);
-        //return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
index 0792030..cc0d9d5 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.Job;
@@ -35,6 +36,8 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
@@ -49,9 +52,6 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
         this.jobId = ParamChecker.notEmpty(jobId, "jobId");
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#getEntityKey()
-     */
     @Override
     public String getEntityKey() {
         return jobId;
@@ -62,17 +62,11 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
         return getName() + "_" + jobId;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#isLockRequired()
-     */
     @Override
     protected boolean isLockRequired() {
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#loadState()
-     */
     @Override
     public void loadState() throws CommandException {
         try {
@@ -88,9 +82,6 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
-     */
     @Override
     protected void verifyPrecondition() throws CommandException, 
PreconditionException {
         if (bundleJob.getStatus() == Job.Status.SUCCEEDED
@@ -103,9 +94,6 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.KillTransitionXCommand#killChildren()
-     */
     @Override
     public void killChildren() throws CommandException {
         if (bundleActions != null) {
@@ -138,35 +126,40 @@ public class BundleKillXCommand extends 
KillTransitionXCommand {
             action.incrementAndGetPending();
             action.setStatus(Job.Status.KILLED);
         }
+        else {
+            // Due to race condition bundle action pending might be true
+            // while coordinator is killed.
+            if (action.isPending() && action.getCoordId() != null) {
+                try {
+                    CoordinatorJobBean coordJob = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
+                            action.getCoordId());
+                    if (!coordJob.isPending() && coordJob.isTerminalStatus()) {
+                        action.decrementAndGetPending();
+                        action.setStatus(coordJob.getStatus());
+                    }
+                }
+                catch (JPAExecutorException e) {
+                    LOG.warn("Error in checking coord job status:" + 
action.getCoordId(), e);
+                }
+            }
+        }
         updateList.add(new 
UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME,
 action));
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
-     */
     @Override
     public void notifyParent() {
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.TransitionXCommand#getJob()
-     */
     @Override
     public Job getJob() {
         return bundleJob;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
-     */
     @Override
     public void updateJob() {
         updateList.add(new 
UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
 bundleJob));
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.KillTransitionXCommand#performWrites()
-     */
     @Override
     public void performWrites() throws CommandException {
         try {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
index a8bacd1..4e6e4ed 100644
--- 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
@@ -56,9 +56,6 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
         this.prevStatus = prevStatus;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#execute()
-     */
     @Override
     protected Void execute() throws CommandException {
         try {
@@ -68,7 +65,10 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
             // The status of bundle action should not be updated if the bundle 
action is in terminal state
             // and coord Id is null. For e.g if Bundleaction is killed and 
coord Id is null, then the status of bundle
             // should not be changed.
-            if (bundleaction.getCoordId() != null || 
!bundleaction.isTerminalStatus()) {
+            if (bundleaction.getCoordId() != null
+                    || !bundleaction.isTerminalStatus()
+                    || (bundleaction.getCoordId() != null && 
bundleaction.isTerminalStatus() && coordjob
+                            .isTerminalStatus())) {
                 bundleaction.setStatus(coordCurrentStatus);
             }
             if (bundleaction.isPending()) {
@@ -106,25 +106,16 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
         return null;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#getEntityKey()
-     */
     @Override
     public String getEntityKey() {
         return coordjob.getBundleId();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#isLockRequired()
-     */
     @Override
     protected boolean isLockRequired() {
         return true;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#loadState()
-     */
     @Override
     protected void loadState() throws CommandException {
         try {
@@ -145,15 +136,19 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
         }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
-     */
     @Override
     protected void verifyPrecondition() throws CommandException, 
PreconditionException {
         if 
(bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && 
bundleaction.getCoordId()!=null) {
             // pending should be decremented only if status of coord job and 
bundle action is same
             // e.g if bundle is killed and coord job is running, then pending 
should not be false
             // to allow recovery service to pick and kill the coord job
+            if (bundleaction.isTerminalStatus() && 
coordjob.isTerminalStatus()) {
+                LOG.info("Bundle action [{0}] status [{1}] is different from 
prev coord status [{2}], "
+                        + "but coord job is currently in terminal state = 
[{3}]",
+                        bundleaction.getBundleActionId(), 
bundleaction.getStatusStr(), prevStatus.toString(),
+                        coordjob.getStatus());
+                return;
+            }
             if (bundleaction.isPending() && 
coordjob.getStatus().equals(bundleaction.getStatus())) {
                 bundleaction.decrementAndGetPending();
             }
@@ -165,10 +160,10 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
             catch (JPAExecutorException je) {
                 throw new CommandException(je);
             }
-            LOG.info("Bundle action [{0}] status [{1}] is different from prev 
coord status [{2}], " +
-            "decrement pending so new pending = [{3}]",
-                            bundleaction.getBundleActionId(), 
bundleaction.getStatusStr(), prevStatus.toString(),
-                            bundleaction.getPending());
+            LOG.info("Bundle action [{0}] status [{1}] is different from prev 
coord status [{2}] and current coord"
+                    + " status [{3}], decrement pending so new pending = 
[{4}]", bundleaction.getBundleActionId(),
+                    bundleaction.getStatusStr(), prevStatus.toString(), 
coordjob.getStatusStr(),
+                    bundleaction.getPending());
             throw new PreconditionException(ErrorCode.E1308, 
bundleaction.getStatusStr(), prevStatus.toString());
         }
         else if 
(bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java
index d0554fe..498ac0d 100644
--- 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java
@@ -28,14 +28,18 @@ import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.Job;
+import org.apache.oozie.client.Job.Status;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.XConfiguration;
 
@@ -85,7 +89,6 @@ public class TestBundleKillXCommand extends XDataTestCase {
         BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.PREP, 
false);
 
         final JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
 
         Configuration jobConf = null;
         try {
@@ -99,6 +102,8 @@ public class TestBundleKillXCommand extends XDataTestCase {
         Path appPath = new Path(jobConf.get(OozieClient.BUNDLE_APP_PATH), 
"bundle.xml");
         jobConf.set(OozieClient.BUNDLE_APP_PATH, appPath.toString());
 
+        new BundleKillXCommand(job.getId()).call();
+
         BundleSubmitXCommand submitCmd = new BundleSubmitXCommand(jobConf);
         submitCmd.call();
 
@@ -155,6 +160,46 @@ public class TestBundleKillXCommand extends XDataTestCase {
         CoordinatorJobBean job2 = jpaService.execute(coordGetCmd2);
         assertEquals(CoordinatorJobBean.Status.KILLED, job2.getStatus());
 
+        final Runnable runnable = new StatusTransitRunnable();
+        runnable.run();
+        sleep(1000);
+
+        job = jpaService.execute(bundleJobGetCmd);
+        assertEquals(Job.Status.KILLED, job.getStatus());
+
+        actions = BundleActionQueryExecutor.getInstance().getList(
+                BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId());
+        for (BundleActionBean action : actions) {
+            assertEquals(0, action.getPending());
+            assertEquals(CoordinatorJobBean.Status.KILLED, action.getStatus());
+        }
+
+        // If bundle kill + Status Transit service left the bundle action with 
Pending=1
+        // due to race condition, killing the bundle again should reset the 
pending.
+        job.setPending();
+        job.setStatus(Status.RUNNING);
+        actions.get(0).incrementAndGetPending();
+        
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING,
 job);
+        BundleActionQueryExecutor.getInstance().executeUpdate(
+                BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, 
actions.get(0));
+
+        runnable.run();
+        sleep(1000);
+        new BundleKillXCommand(job.getId()).call();
+        job = jpaService.execute(bundleJobGetCmd);
+        assertEquals(Job.Status.KILLED, job.getStatus());
+
+        runnable.run();
+        sleep(1000);
+        job = jpaService.execute(bundleJobGetCmd);
+        assertEquals(Job.Status.KILLED, job.getStatus());
+        actions = BundleActionQueryExecutor.getInstance().getList(
+                BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId());
+        for (BundleActionBean action : actions) {
+            assertEquals(0, action.getPending());
+            assertEquals(CoordinatorJobBean.Status.KILLED, action.getStatus());
+        }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 876a256..b73e6e7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1687 Bundle can still be in RUNNINGWITHERROR status after bundle kill 
(rohini)
 OOZIE-1684 DB upgrade from 3.3.0 to trunk fails on Oracle (rkanter)
 OOZIE-1675 Adding absolute URI of local cluster to dist cache not working with 
hadoop version 0.20.2 and before (satish via ryota)
 OOZIE-1683 UserGroupInformationService should close any filesystems opened by 
it (rkanter)

Reply via email to