Repository: oozie Updated Branches: refs/heads/master 69671d641 -> d1d14f998
OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d1d14f99 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d1d14f99 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d1d14f99 Branch: refs/heads/master Commit: d1d14f998700f1279a023de21e0bdbc08c6336d3 Parents: 69671d6 Author: Purshotam Shah <[email protected]> Authored: Tue Oct 14 14:03:06 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue Oct 14 14:03:06 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/CoordinatorJobBean.java | 2 +- .../coord/TestAbandonedCoordChecker.java | 89 ++++++++++++++------ .../org/apache/oozie/test/XDataTestCase.java | 47 ++++++++++- 3 files changed, 110 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 8f11645..2362084 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -139,7 +139,7 @@ import org.json.simple.JSONObject; }) @NamedNativeQueries({ - @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = "select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT' or a.STATUS = 'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group by b.job_id having count(*) > 0 ) successJobs on failedJobs.job_id = successJobs.job_id where successJobs.job_id is null )") + @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = "select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 and w.created_time < ?2 and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT' or a.STATUS = 'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group by b.job_id having count(*) > 0 ) successJobs on failedJobs.job_id = successJobs.job_id where successJobs.job_id is null )") }) @Table(name = "COORD_JOBS") http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java index fa47dd6..24b6f66 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java @@ -49,13 +49,14 @@ public class TestAbandonedCoordChecker extends XDataTestCase { public void tesAbandonedFailed() throws Exception { Date start = DateUtils.addMonths(new Date(), -1); Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + Date createdTime = start; - final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 5); + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 5); addRecordToCoordActionTable(job1.getId(), 5, CoordinatorAction.Status.FAILED); - final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 4); + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 4); addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.FAILED); AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(5); @@ -70,14 +71,16 @@ public class TestAbandonedCoordChecker extends XDataTestCase { Date start = DateUtils.addMonths(new Date(), -1); Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs - final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 6); + Date createdTime = start; + + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 6); addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.SUCCEEDED, CoordinatorAction.Status.FAILED); - final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true, - false, 6); + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, + createdTime, true, false, 6); addRecordToCoordActionTable(job2.getId(), 6, CoordinatorAction.Status.SUCCEEDED, CoordinatorAction.Status.FAILED); @@ -92,14 +95,15 @@ public class TestAbandonedCoordChecker extends XDataTestCase { public void testMessage_withTimedout() throws Exception { Date start = DateUtils.addMonths(new Date(), -1); Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + Date createdTime = start; - final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 12); + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 12); addRecordToCoordActionTable(job1.getId(), 12, CoordinatorAction.Status.TIMEDOUT); - final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 4); + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 4); addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.TIMEDOUT); AbandonedCoordCheckerRunnable coordChecked = new AbandonedCoordCheckerRunnable(10); @@ -113,21 +117,22 @@ public class TestAbandonedCoordChecker extends XDataTestCase { public void testMessage_withMixedStatus() throws Exception { Date start = DateUtils.addMonths(new Date(), -1); Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + Date createdTime = start; - final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 5); + final CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 5); addRecordToCoordActionTable(job1.getId(), 5, CoordinatorAction.Status.FAILED, CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); - final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, - false, 5); + final CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, + createdTime, true, false, 5); addRecordToCoordActionTable(job2.getId(), 5, CoordinatorAction.Status.FAILED, CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); - final CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true, - false, 5); + final CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, + createdTime, true, false, 5); addRecordToCoordActionTable(job3.getId(), 5, CoordinatorAction.Status.FAILED, CoordinatorAction.Status.SUSPENDED, CoordinatorAction.Status.TIMEDOUT); @@ -142,9 +147,13 @@ public class TestAbandonedCoordChecker extends XDataTestCase { public void testKill() throws Exception { Date start = DateUtils.addMonths(new Date(), -1); Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs - CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 6); + Date createdTime = start; + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 6); addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.FAILED); - CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 4); + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 4); addRecordToCoordActionTable(job2.getId(), 4, CoordinatorAction.Status.FAILED); new AbandonedCoordCheckerRunnable(5, true).run(); assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId()).getStatus(), @@ -154,19 +163,49 @@ public class TestAbandonedCoordChecker extends XDataTestCase { } public void testStartTime() throws Exception { - Date start = new Date(); - Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs - CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 6); + Date start = DateUtils.addDays(new Date(), 1); + Date end = DateUtils.addDays(new Date(), 6); + Date createdTime = new Date(); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 6); addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.FAILED); - CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 4); + + start = DateUtils.addDays(new Date(), -3); + createdTime = DateUtils.addDays(new Date(), -4); + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 4); addRecordToCoordActionTable(job2.getId(), 10, CoordinatorAction.Status.FAILED); new AbandonedCoordCheckerRunnable(5, true).run(); - // Both job should be running as starttime > 2 days buffer + // job1 should be RUNNING as starttime > 2 days buffer assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId()).getStatus(), CoordinatorJob.Status.RUNNING); assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job2.getId()).getStatus(), + CoordinatorJob.Status.KILLED); + } + + public void testCatchupJob() throws Exception { + Date start = DateUtils.addMonths(new Date(), -1); + Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs + Date createdTime = DateUtils.addDays(new Date(), -1); + + CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 6); + addRecordToCoordActionTable(job1.getId(), 6, CoordinatorAction.Status.FAILED); + + createdTime = DateUtils.addDays(new Date(), -3); + + CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime, + true, false, 4); + addRecordToCoordActionTable(job2.getId(), 10, CoordinatorAction.Status.FAILED); + new AbandonedCoordCheckerRunnable(5, true).run(); + + // Only one job should be running. + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job1.getId()).getStatus(), CoordinatorJob.Status.RUNNING); + assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job2.getId()).getStatus(), + CoordinatorJob.Status.KILLED); } private void addRecordToCoordActionTable(String jobId, int count, CoordinatorAction.Status... status) http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index c149d16..f076cb0 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -210,6 +210,7 @@ public abstract class XDataTestCase extends XHCatTestCase { * @param status coord job status * @param start start time * @param end end time + * @param created Time * @param pending true if pending is true * @param doneMatd true if doneMaterialization is true * @param lastActionNum last action number @@ -218,8 +219,25 @@ public abstract class XDataTestCase extends XHCatTestCase { */ protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { - CoordinatorJobBean coordJob = createCoordJob(status, start, end, pending, doneMatd, lastActionNum); + return addRecordToCoordJobTable(status, start, end, new Date(), pending, doneMatd, lastActionNum); + } + /** + * Insert coord job for testing. + * + * @param status coord job status + * @param start start time + * @param end end time + * @param created Time + * @param pending true if pending is true + * @param doneMatd true if doneMaterialization is true + * @param lastActionNum last action number + * @return coord job bean + * @throws Exception + */ + protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end, + Date createdTime, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { + CoordinatorJobBean coordJob = createCoordJob(status, start, end, createdTime, pending, doneMatd, lastActionNum); try { JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); @@ -383,6 +401,26 @@ public abstract class XDataTestCase extends XHCatTestCase { return createCoordBean(appPath, appXml, status, start, end, pending, doneMatd, lastActionNum); } + /** + * Create coord job bean + * + * @param status coord job status + * @param start start time + * @param end end time + * @param created Time + * @param pending true if pending is true + * @param doneMatd true if doneMaterialization is true + * @param lastActionNum last action number + * @return coord job bean + * @throws IOException + */ + protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, Date start, Date end, Date createTime, boolean pending, + boolean doneMatd, int lastActionNum) throws Exception { + Path appPath = new Path(getFsTestCaseDir(), "coord"); + String appXml = writeCoordXml(appPath, start, end); + + return createCoordBean(appPath, appXml, status, start, end, createTime, pending, doneMatd, lastActionNum); + } /** * Create coord job bean @@ -407,13 +445,18 @@ public abstract class XDataTestCase extends XHCatTestCase { private CoordinatorJobBean createCoordBean(Path appPath, String appXml, CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { + return createCoordBean(appPath, appXml, status, start, end, new Date(), pending, doneMatd, lastActionNum); + } + + private CoordinatorJobBean createCoordBean(Path appPath, String appXml, CoordinatorJob.Status status, Date start, + Date end, Date createdTime, boolean pending, boolean doneMatd, int lastActionNum) throws Exception { CoordinatorJobBean coordJob = new CoordinatorJobBean(); coordJob.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR)); coordJob.setAppName("COORD-TEST"); coordJob.setAppPath(appPath.toString()); coordJob.setStatus(status); coordJob.setTimeZone("America/Los_Angeles"); - coordJob.setCreatedTime(new Date()); + coordJob.setCreatedTime(createdTime); coordJob.setLastModifiedTime(new Date()); coordJob.setUser(getTestUser()); coordJob.setGroup(getTestGroup());
