Repository: oozie
Updated Branches:
  refs/heads/master 69671d641 -> d1d14f998


OOZIE-1813 Add service to report/kill rogue bundles and coordinator jobs


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d1d14f99
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d1d14f99
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d1d14f99

Branch: refs/heads/master
Commit: d1d14f998700f1279a023de21e0bdbc08c6336d3
Parents: 69671d6
Author: Purshotam Shah <[email protected]>
Authored: Tue Oct 14 14:03:06 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Tue Oct 14 14:03:06 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    |  2 +-
 .../coord/TestAbandonedCoordChecker.java        | 89 ++++++++++++++------
 .../org/apache/oozie/test/XDataTestCase.java    | 47 ++++++++++-
 3 files changed, 110 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 8f11645..2362084 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -139,7 +139,7 @@ import org.json.simple.JSONObject;
 
 })
 @NamedNativeQueries({
-        @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = 
"select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( 
w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 
and w.id in (select failedJobs.job_id from (select a.job_id from coord_actions 
a where ( a.STATUS = 'FAILED' or a.STATUS = 'TIMEDOUT'  or a.STATUS = 
'SUSPENDED') group by a.job_id having count(*) >= ?1 ) failedJobs LEFT OUTER 
JOIN (select b.job_id from coord_actions b where b.STATUS = 'SUCCEEDED' group 
by b.job_id having count(*) > 0 ) successJobs   on  failedJobs.job_id = 
successJobs.job_id where successJobs.job_id is null )")
+        @NamedNativeQuery(name = "GET_COORD_FOR_ABANDONEDCHECK", query = 
"select w.id, w.USER_NAME, w.group_name, w.APP_NAME from coord_jobs w where ( 
w.STATUS = 'RUNNING' or w.STATUS = 'RUNNINGWITHERROR' ) and w.start_time < ?2 
and w.created_time < ?2 and w.id in (select failedJobs.job_id from (select 
a.job_id from coord_actions a where ( a.STATUS = 'FAILED' or a.STATUS = 
'TIMEDOUT'  or a.STATUS = 'SUSPENDED') group by a.job_id having count(*) >= ?1 
) failedJobs LEFT OUTER JOIN (select b.job_id from coord_actions b where 
b.STATUS = 'SUCCEEDED' group by b.job_id having count(*) > 0 ) successJobs   on 
 failedJobs.job_id = successJobs.job_id where successJobs.job_id is null )")
 
 })
 @Table(name = "COORD_JOBS")

http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
index fa47dd6..24b6f66 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestAbandonedCoordChecker.java
@@ -49,13 +49,14 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
     public void tesAbandonedFailed() throws Exception {
         Date start = DateUtils.addMonths(new Date(), -1);
         Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date createdTime = start;
 
-        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 5);
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 5);
         addRecordToCoordActionTable(job1.getId(), 5, 
CoordinatorAction.Status.FAILED);
 
-        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 4);
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 4);
         addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.FAILED);
 
         AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(5);
@@ -70,14 +71,16 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
         Date start = DateUtils.addMonths(new Date(), -1);
         Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
 
-        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 6);
+        Date createdTime = start;
+
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 6);
 
         addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.SUCCEEDED,
                 CoordinatorAction.Status.FAILED);
 
-        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true,
-                false, 6);
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end,
+                createdTime, true, false, 6);
 
         addRecordToCoordActionTable(job2.getId(), 6, 
CoordinatorAction.Status.SUCCEEDED,
                 CoordinatorAction.Status.FAILED);
@@ -92,14 +95,15 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
     public void testMessage_withTimedout() throws Exception {
         Date start = DateUtils.addMonths(new Date(), -1);
         Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date createdTime = start;
 
-        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 12);
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 12);
 
         addRecordToCoordActionTable(job1.getId(), 12, 
CoordinatorAction.Status.TIMEDOUT);
 
-        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 4);
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 4);
         addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.TIMEDOUT);
 
         AbandonedCoordCheckerRunnable coordChecked = new 
AbandonedCoordCheckerRunnable(10);
@@ -113,21 +117,22 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
     public void testMessage_withMixedStatus() throws Exception {
         Date start = DateUtils.addMonths(new Date(), -1);
         Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date createdTime = start;
 
-        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 5);
+        final CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 5);
 
         addRecordToCoordActionTable(job1.getId(), 5, 
CoordinatorAction.Status.FAILED,
                 CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
 
-        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true,
-                false, 5);
+        final CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end,
+                createdTime, true, false, 5);
 
         addRecordToCoordActionTable(job2.getId(), 5, 
CoordinatorAction.Status.FAILED,
                 CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
 
-        final CoordinatorJobBean job3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end, true,
-                false, 5);
+        final CoordinatorJobBean job3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, start, end,
+                createdTime, true, false, 5);
         addRecordToCoordActionTable(job3.getId(), 5, 
CoordinatorAction.Status.FAILED,
                 CoordinatorAction.Status.SUSPENDED, 
CoordinatorAction.Status.TIMEDOUT);
 
@@ -142,9 +147,13 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
     public void testKill() throws Exception {
         Date start = DateUtils.addMonths(new Date(), -1);
         Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
-        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 6);
+        Date createdTime = start;
+
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 6);
         addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.FAILED);
-        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 4);
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 4);
         addRecordToCoordActionTable(job2.getId(), 4, 
CoordinatorAction.Status.FAILED);
         new AbandonedCoordCheckerRunnable(5, true).run();
         
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job1.getId()).getStatus(),
@@ -154,19 +163,49 @@ public class TestAbandonedCoordChecker extends 
XDataTestCase {
     }
 
     public void testStartTime() throws Exception {
-        Date start = new Date();
-        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
-        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 6);
+        Date start = DateUtils.addDays(new Date(), 1);
+        Date end = DateUtils.addDays(new Date(), 6);
+        Date createdTime = new Date();
+
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 6);
         addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.FAILED);
-        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, 
false, 4);
+
+        start = DateUtils.addDays(new Date(), -3);
+        createdTime = DateUtils.addDays(new Date(), -4);
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 4);
         addRecordToCoordActionTable(job2.getId(), 10, 
CoordinatorAction.Status.FAILED);
         new AbandonedCoordCheckerRunnable(5, true).run();
 
-        // Both job should be running as starttime > 2 days buffer
+        // job1 should be RUNNING as starttime > 2 days buffer
         
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job1.getId()).getStatus(),
                 CoordinatorJob.Status.RUNNING);
         
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job2.getId()).getStatus(),
+                CoordinatorJob.Status.KILLED);
+    }
+
+    public void testCatchupJob() throws Exception {
+        Date start = DateUtils.addMonths(new Date(), -1);
+        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date createdTime = DateUtils.addDays(new Date(), -1);
+
+        CoordinatorJobBean job1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 6);
+        addRecordToCoordActionTable(job1.getId(), 6, 
CoordinatorAction.Status.FAILED);
+
+        createdTime = DateUtils.addDays(new Date(), -3);
+
+        CoordinatorJobBean job2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, createdTime,
+                true, false, 4);
+        addRecordToCoordActionTable(job2.getId(), 10, 
CoordinatorAction.Status.FAILED);
+        new AbandonedCoordCheckerRunnable(5, true).run();
+
+        // Only one job should be running.
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job1.getId()).getStatus(),
                 CoordinatorJob.Status.RUNNING);
+        
assertEquals(CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB,
 job2.getId()).getStatus(),
+                CoordinatorJob.Status.KILLED);
     }
 
     private void addRecordToCoordActionTable(String jobId, int count, 
CoordinatorAction.Status... status)

http://git-wip-us.apache.org/repos/asf/oozie/blob/d1d14f99/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index c149d16..f076cb0 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -210,6 +210,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
      * @param status coord job status
      * @param start start time
      * @param end end time
+     * @param created Time
      * @param pending true if pending is true
      * @param doneMatd true if doneMaterialization is true
      * @param lastActionNum last action number
@@ -218,8 +219,25 @@ public abstract class XDataTestCase extends XHCatTestCase {
      */
     protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end,
             boolean pending, boolean doneMatd, int lastActionNum) throws 
Exception {
-        CoordinatorJobBean coordJob = createCoordJob(status, start, end, 
pending, doneMatd, lastActionNum);
+        return addRecordToCoordJobTable(status, start, end, new Date(), 
pending, doneMatd, lastActionNum);
+    }
 
+    /**
+     * Insert coord job for testing.
+     *
+     * @param status coord job status
+     * @param start start time
+     * @param end end time
+     * @param created Time
+     * @param pending true if pending is true
+     * @param doneMatd true if doneMaterialization is true
+     * @param lastActionNum last action number
+     * @return coord job bean
+     * @throws Exception
+     */
+    protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date start, Date end,
+            Date createdTime, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
+        CoordinatorJobBean coordJob = createCoordJob(status, start, end, 
createdTime, pending, doneMatd, lastActionNum);
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
             assertNotNull(jpaService);
@@ -383,6 +401,26 @@ public abstract class XDataTestCase extends XHCatTestCase {
 
         return createCoordBean(appPath, appXml, status, start, end, pending, 
doneMatd, lastActionNum);
     }
+    /**
+     * Create coord job bean
+     *
+     * @param status coord job status
+     * @param start start time
+     * @param end end time
+     * @param created Time
+     * @param pending true if pending is true
+     * @param doneMatd true if doneMaterialization is true
+     * @param lastActionNum last action number
+     * @return coord job bean
+     * @throws IOException
+     */
+    protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, 
Date start, Date end, Date createTime, boolean pending,
+            boolean doneMatd, int lastActionNum) throws Exception {
+        Path appPath = new Path(getFsTestCaseDir(), "coord");
+        String appXml = writeCoordXml(appPath, start, end);
+
+        return createCoordBean(appPath, appXml, status, start, end, 
createTime, pending, doneMatd, lastActionNum);
+    }
 
     /**
      * Create coord job bean
@@ -407,13 +445,18 @@ public abstract class XDataTestCase extends XHCatTestCase 
{
 
     private CoordinatorJobBean createCoordBean(Path appPath, String appXml, 
CoordinatorJob.Status status, Date start,
             Date end, boolean pending, boolean doneMatd, int lastActionNum) 
throws Exception {
+        return createCoordBean(appPath, appXml, status, start, end, new 
Date(), pending, doneMatd, lastActionNum);
+    }
+
+    private CoordinatorJobBean createCoordBean(Path appPath, String appXml, 
CoordinatorJob.Status status, Date start,
+            Date end, Date createdTime, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
         CoordinatorJobBean coordJob = new CoordinatorJobBean();
         
coordJob.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR));
         coordJob.setAppName("COORD-TEST");
         coordJob.setAppPath(appPath.toString());
         coordJob.setStatus(status);
         coordJob.setTimeZone("America/Los_Angeles");
-        coordJob.setCreatedTime(new Date());
+        coordJob.setCreatedTime(createdTime);
         coordJob.setLastModifiedTime(new Date());
         coordJob.setUser(getTestUser());
         coordJob.setGroup(getTestGroup());

Reply via email to