Updated Branches: refs/heads/master 55c15b2f6 -> b6d7942cd
OOZIE-1687 Bundle can still be in RUNNINGWITHERROR status after bundle kill (rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b6d7942c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b6d7942c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b6d7942c Branch: refs/heads/master Commit: b6d7942cdaa56fa57c8ba70d7388ae8716f7355a Parents: 55c15b2 Author: Rohini Palaniswamy <[email protected]> Authored: Thu Feb 6 11:22:45 2014 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Thu Feb 6 11:22:45 2014 -0800 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 20 ++++++++- .../command/bundle/BundleKillXCommand.java | 47 +++++++++----------- .../bundle/BundleStatusUpdateXCommand.java | 35 +++++++-------- .../command/bundle/TestBundleKillXCommand.java | 47 +++++++++++++++++++- release-log.txt | 1 + 5 files changed, 101 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 16209ce..95f6991 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -625,10 +625,28 @@ public class CoordinatorJobBean implements Writable, CoordinatorJob, JsonBean { setAppNamespace(WritableUtils.readStr(dataInput)); } + /** + * @return true if in terminal status + */ + public boolean isTerminalStatus() { + boolean isTerminal = false; + switch (getStatus()) { + case SUCCEEDED: + case FAILED: + case KILLED: + case DONEWITHERROR: + isTerminal = true; + break; + default: + isTerminal = false; + break; + } + return isTerminal; + } + @Override public Status getStatus() { return Status.valueOf(this.statusStr); - //return null; } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java index 0792030..cc0d9d5 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleKillXCommand.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; import org.apache.oozie.client.Job; @@ -35,6 +36,8 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.ParamChecker; @@ -49,9 +52,6 @@ public class BundleKillXCommand extends KillTransitionXCommand { this.jobId = ParamChecker.notEmpty(jobId, "jobId"); } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#getEntityKey() - */ @Override public String getEntityKey() { return jobId; @@ -62,17 +62,11 @@ public class BundleKillXCommand extends KillTransitionXCommand { return getName() + "_" + jobId; } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#isLockRequired() - */ @Override protected boolean isLockRequired() { return true; } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#loadState() - */ @Override public void loadState() throws CommandException { try { @@ -88,9 +82,6 @@ public class BundleKillXCommand extends KillTransitionXCommand { } } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#verifyPrecondition() - */ @Override protected void verifyPrecondition() throws CommandException, PreconditionException { if (bundleJob.getStatus() == Job.Status.SUCCEEDED @@ -103,9 +94,6 @@ public class BundleKillXCommand extends KillTransitionXCommand { } } - /* (non-Javadoc) - * @see org.apache.oozie.command.KillTransitionXCommand#killChildren() - */ @Override public void killChildren() throws CommandException { if (bundleActions != null) { @@ -138,35 +126,40 @@ public class BundleKillXCommand extends KillTransitionXCommand { action.incrementAndGetPending(); action.setStatus(Job.Status.KILLED); } + else { + // Due to race condition bundle action pending might be true + // while coordinator is killed. + if (action.isPending() && action.getCoordId() != null) { + try { + CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, + action.getCoordId()); + if (!coordJob.isPending() && coordJob.isTerminalStatus()) { + action.decrementAndGetPending(); + action.setStatus(coordJob.getStatus()); + } + } + catch (JPAExecutorException e) { + LOG.warn("Error in checking coord job status:" + action.getCoordId(), e); + } + } + } updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); } - /* (non-Javadoc) - * @see org.apache.oozie.command.TransitionXCommand#notifyParent() - */ @Override public void notifyParent() { } - /* (non-Javadoc) - * @see org.apache.oozie.command.TransitionXCommand#getJob() - */ @Override public Job getJob() { return bundleJob; } - /* (non-Javadoc) - * @see org.apache.oozie.command.TransitionXCommand#updateJob() - */ @Override public void updateJob() { updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob)); } - /* (non-Javadoc) - * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() - */ @Override public void performWrites() throws CommandException { try { http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java index a8bacd1..4e6e4ed 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java @@ -56,9 +56,6 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { this.prevStatus = prevStatus; } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#execute() - */ @Override protected Void execute() throws CommandException { try { @@ -68,7 +65,10 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { // The status of bundle action should not be updated if the bundle action is in terminal state // and coord Id is null. For e.g if Bundleaction is killed and coord Id is null, then the status of bundle // should not be changed. - if (bundleaction.getCoordId() != null || !bundleaction.isTerminalStatus()) { + if (bundleaction.getCoordId() != null + || !bundleaction.isTerminalStatus() + || (bundleaction.getCoordId() != null && bundleaction.isTerminalStatus() && coordjob + .isTerminalStatus())) { bundleaction.setStatus(coordCurrentStatus); } if (bundleaction.isPending()) { @@ -106,25 +106,16 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { return null; } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#getEntityKey() - */ @Override public String getEntityKey() { return coordjob.getBundleId(); } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#isLockRequired() - */ @Override protected boolean isLockRequired() { return true; } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#loadState() - */ @Override protected void loadState() throws CommandException { try { @@ -145,15 +136,19 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { } } - /* (non-Javadoc) - * @see org.apache.oozie.command.XCommand#verifyPrecondition() - */ @Override protected void verifyPrecondition() throws CommandException, PreconditionException { if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0 && bundleaction.getCoordId()!=null) { // pending should be decremented only if status of coord job and bundle action is same // e.g if bundle is killed and coord job is running, then pending should not be false // to allow recovery service to pick and kill the coord job + if (bundleaction.isTerminalStatus() && coordjob.isTerminalStatus()) { + LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " + + "but coord job is currently in terminal state = [{3}]", + bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), + coordjob.getStatus()); + return; + } if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) { bundleaction.decrementAndGetPending(); } @@ -165,10 +160,10 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { catch (JPAExecutorException je) { throw new CommandException(je); } - LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}], " + - "decrement pending so new pending = [{3}]", - bundleaction.getBundleActionId(), bundleaction.getStatusStr(), prevStatus.toString(), - bundleaction.getPending()); + LOG.info("Bundle action [{0}] status [{1}] is different from prev coord status [{2}] and current coord" + + " status [{3}], decrement pending so new pending = [{4}]", bundleaction.getBundleActionId(), + bundleaction.getStatusStr(), prevStatus.toString(), coordjob.getStatusStr(), + bundleaction.getPending()); throw new PreconditionException(ErrorCode.E1308, bundleaction.getStatusStr(), prevStatus.toString()); } else if (bundleaction.getStatusStr().compareToIgnoreCase(prevStatus.toString()) != 0) { http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java index d0554fe..498ac0d 100644 --- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleKillXCommand.java @@ -28,14 +28,18 @@ import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.Job; +import org.apache.oozie.client.Job.Status; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.XConfiguration; @@ -85,7 +89,6 @@ public class TestBundleKillXCommand extends XDataTestCase { BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.PREP, false); final JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); Configuration jobConf = null; try { @@ -99,6 +102,8 @@ public class TestBundleKillXCommand extends XDataTestCase { Path appPath = new Path(jobConf.get(OozieClient.BUNDLE_APP_PATH), "bundle.xml"); jobConf.set(OozieClient.BUNDLE_APP_PATH, appPath.toString()); + new BundleKillXCommand(job.getId()).call(); + BundleSubmitXCommand submitCmd = new BundleSubmitXCommand(jobConf); submitCmd.call(); @@ -155,6 +160,46 @@ public class TestBundleKillXCommand extends XDataTestCase { CoordinatorJobBean job2 = jpaService.execute(coordGetCmd2); assertEquals(CoordinatorJobBean.Status.KILLED, job2.getStatus()); + final Runnable runnable = new StatusTransitRunnable(); + runnable.run(); + sleep(1000); + + job = jpaService.execute(bundleJobGetCmd); + assertEquals(Job.Status.KILLED, job.getStatus()); + + actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId()); + for (BundleActionBean action : actions) { + assertEquals(0, action.getPending()); + assertEquals(CoordinatorJobBean.Status.KILLED, action.getStatus()); + } + + // If bundle kill + Status Transit service left the bundle action with Pending=1 + // due to race condition, killing the bundle again should reset the pending. + job.setPending(); + job.setStatus(Status.RUNNING); + actions.get(0).incrementAndGetPending(); + BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, job); + BundleActionQueryExecutor.getInstance().executeUpdate( + BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, actions.get(0)); + + runnable.run(); + sleep(1000); + new BundleKillXCommand(job.getId()).call(); + job = jpaService.execute(bundleJobGetCmd); + assertEquals(Job.Status.KILLED, job.getStatus()); + + runnable.run(); + sleep(1000); + job = jpaService.execute(bundleJobGetCmd); + assertEquals(Job.Status.KILLED, job.getStatus()); + actions = BundleActionQueryExecutor.getInstance().getList( + BundleActionQuery.GET_BUNDLE_ACTIONS_FOR_BUNDLE, job.getId()); + for (BundleActionBean action : actions) { + assertEquals(0, action.getPending()); + assertEquals(CoordinatorJobBean.Status.KILLED, action.getStatus()); + } + } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/b6d7942c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 876a256..b73e6e7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1687 Bundle can still be in RUNNINGWITHERROR status after bundle kill (rohini) OOZIE-1684 DB upgrade from 3.3.0 to trunk fails on Oracle (rkanter) OOZIE-1675 Adding absolute URI of local cluster to dist cache not working with hadoop version 0.20.2 and before (satish via ryota) OOZIE-1683 UserGroupInformationService should close any filesystems opened by it (rkanter)
