Repository: oozie Updated Branches: refs/heads/master f1f3c64dd -> 0bb5e1369
OOZIE-2228 Statustransit service doesn't pick bundle with suspend status Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0bb5e136 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0bb5e136 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0bb5e136 Branch: refs/heads/master Commit: 0bb5e13694c2b17c8231113c98569b8fcd871c58 Parents: f1f3c64 Author: Purshotam Shah <[email protected]> Authored: Thu Aug 13 15:27:46 2015 -0700 Committer: Purshotam Shah <[email protected]> Committed: Thu Aug 13 15:27:46 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/oozie/BundleJobBean.java | 4 +-- .../bundle/BundleStatusTransitXCommand.java | 16 +++++++--- .../executor/jpa/BundleJobQueryExecutor.java | 9 ++++-- .../jpa/TestBundleJobQueryExecutor.java | 2 +- .../oozie/service/TestStatusTransitService.java | 33 +++++++++++++++++++- release-log.txt | 1 + 6 files changed, 53 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/BundleJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java index 9b31bd3..5868412 100644 --- a/core/src/main/java/org/apache/oozie/BundleJobBean.java +++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java @@ -75,7 +75,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_BUNDLE_JOB_STATUS", query = "select w.statusStr from BundleJobBean w where w.id = :id"), - @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp from BundleJobBean w where w.id = :id"), + @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp, w.pauseTimestamp, w.suspendedTimestamp from BundleJobBean w where w.id = :id"), @NamedQuery(name = "GET_BUNDLE_JOB_ID_JOBXML_CONF", query = "select w.id, w.jobXml, w.conf from BundleJobBean w where w.id = :id"), @@ -107,7 +107,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"), - @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.pending = 1)"), + @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.pending = 1)"), @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") }) http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java index fb45eb4..835777c 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java @@ -65,7 +65,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand { protected void loadState() throws CommandException { try { bundleJob = BundleJobQueryExecutor.getInstance().get( - BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId); + BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId); bundleActions = BundleActionQueryExecutor.getInstance().getList( BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId); @@ -82,7 +82,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand { && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) { new BundleKillXCommand(jobId).call(); bundleJob = BundleJobQueryExecutor.getInstance().get( - BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId); + BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId); bundleJob.setStatus(Job.Status.FAILED); bundleJob.setLastModifiedTime(new Date()); BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS, @@ -150,13 +150,16 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand { @Override protected boolean isPausedState() { - - if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) { + //If bundle is paused then timestamp will be set. + //If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up. + if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR + && bundleJob.getPauseTime() != null) { return true; } else { return getBottomUpPauseStatus() != null; } + } @Override @@ -177,7 +180,10 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand { @Override protected boolean isSuspendedState() { - if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) { + //If bundle is suspended then timestamp will be set. + //If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up. + if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) + && bundleJob.getSuspendedTimestamp() != null) { return true; } http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java index a770aad..e07672b 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java @@ -48,7 +48,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ UPDATE_BUNDLE_JOB_PAUSE_KICKOFF, GET_BUNDLE_JOB, GET_BUNDLE_JOB_STATUS, - GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, + GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, GET_BUNDLE_JOB_ID_JOBXML_CONF, GET_BUNDLE_IDS_FOR_STATUS_TRANSIT }; @@ -131,7 +131,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ Query query = em.createNamedQuery(namedQuery.name()); switch (namedQuery) { case GET_BUNDLE_JOB: - case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME: + case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME: case GET_BUNDLE_JOB_ID_JOBXML_CONF: case GET_BUNDLE_JOB_STATUS: query.setParameter("id", parameters[0]); @@ -196,13 +196,16 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ bean.setId((String) parameters[0]); bean.setStatus((String) ret); break; - case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME: + case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME: bean = new BundleJobBean(); arr = (Object[]) ret; bean.setId((String) arr[0]); bean.setStatus((String) arr[1]); bean.setPending((Integer) arr[2]); bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3])); + bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4])); + bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5])); + break; case GET_BUNDLE_JOB_ID_JOBXML_CONF: bean = new BundleJobBean(); http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java index 509eedd..97cbb7f 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java @@ -128,7 +128,7 @@ public class TestBundleJobQueryExecutor extends XDataTestCase { BundleJobBean bean = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); // GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME BundleJobBean retBean = BundleJobQueryExecutor.getInstance().get( - BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bean.getId()); + BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, bean.getId()); assertEquals(bean.getId(), retBean.getId()); assertEquals(bean.getStatusStr(), retBean.getStatusStr()); assertEquals(bean.getPending(), retBean.getPending()); http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java index d25a2a4..0bcbce0 100644 --- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java +++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java @@ -1648,7 +1648,38 @@ public class TestStatusTransitService extends XDataTestCase { } - static class JobLock implements Runnable { + public void testBundleRunningAfterCoordResume() throws Exception { + + setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); + services = new Services(); + services.init(); + CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + + BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); + final String bundleId = bundleJob.getId(); + addRecordToBundleActionTable(bundleId, coord.getId(), "COORD-TEST", 0, Job.Status.RUNNING); + new CoordSuspendXCommand(coord.getId()).call(); + + coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId()); + assertEquals(Job.Status.SUSPENDED, coord.getStatus()); + + Runnable runnable = new StatusTransitRunnable(); + runnable.run(); + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus()); + + new CoordResumeXCommand(coord.getId()).call(); + coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId()); + assertEquals(Job.Status.RUNNING, coord.getStatus()); + + runnable = new StatusTransitRunnable(); + runnable.run(); + bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); + assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); + + } + + static class JobLock implements Runnable { String jobId; public JobLock(String jobId) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 1f50731..729202a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru) OOZIE-2325 Shell action fails if user overrides oozie.launcher.mapreduce.map.env (kailongs via puru) OOZIE-2324 A syntax error in the kill node causes the workflow to get stuck and other problems (rkanter) OOZIE-2309 Enable the coord:dateOffset() function in /coordinator-app/datasets/dataset/@initial-instance (kailongs via rohini)
