OOZIE-1803 Improvement in Purge service (jaydeepvishwakarma via shwethags)

Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0d7b94af
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0d7b94af
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0d7b94af

Branch: refs/heads/master
Commit: 0d7b94af4999577f83a192c6b044f600141f9f6a
Parents: aa7bba5
Author: Shwetha GS <[email protected]>
Authored: Tue Nov 11 11:44:51 2014 +0530
Committer: Shwetha GS <[email protected]>
Committed: Tue Nov 11 11:44:51 2014 +0530

----------------------------------------------------------------------
 .../java/org/apache/oozie/BundleActionBean.java |   2 +-
 .../java/org/apache/oozie/BundleJobBean.java    |   2 +-
 .../org/apache/oozie/CoordinatorActionBean.java |  43 +++----
 .../org/apache/oozie/CoordinatorJobBean.java    |  44 +++----
 .../org/apache/oozie/WorkflowActionBean.java    |   4 +-
 .../java/org/apache/oozie/WorkflowJobBean.java  |  47 ++++---
 .../org/apache/oozie/command/PurgeXCommand.java | 129 ++++++++++++-------
 .../jpa/BundleJobsDeleteJPAExecutor.java        |  29 ++---
 .../jpa/CoordActionsDeleteJPAExecutor.java      |  24 ++--
 ...oordActionsGetFromCoordJobIdJPAExecutor.java |  59 +++++++++
 .../jpa/CoordJobsDeleteJPAExecutor.java         |  26 ++--
 ...bsBasicInfoFromCoordParentIdJPAExecutor.java |  85 ++++++++++++
 ...asicInfoFromWorkflowParentIdJPAExecutor.java |  87 +++++++++++++
 ...NotForPurgeFromCoordParentIdJPAExecutor.java |  64 ---------
 ...ForPurgeFromWorkflowParentIdJPAExecutor.java |  64 ---------
 .../jpa/WorkflowJobsDeleteJPAExecutor.java      |  30 ++---
 .../jpa/TestCoordJobsDeleteJPAExecutor.java     |  46 ++++---
 ...bsBasicInfoFromCoordParentIdJPAExecutor.java | 117 +++++++++++++++++
 ...asicInfoFromWorkflowParentIdJPAExecutor.java | 120 +++++++++++++++++
 ...NotForPurgeFromCoordParentIdJPAExecutor.java | 116 -----------------
 ...ForPurgeFromWorkflowParentIdJPAExecutor.java | 101 ---------------
 release-log.txt                                 |   1 +
 22 files changed, 695 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/BundleActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleActionBean.java 
b/core/src/main/java/org/apache/oozie/BundleActionBean.java
index 65bfe8c..56e3672 100644
--- a/core/src/main/java/org/apache/oozie/BundleActionBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleActionBean.java
@@ -77,7 +77,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = 
"delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 
'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 
'DONEWITHERROR')"),
 
-        @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from 
BundleActionBean a where a.bundleId = :bundleId")})
+        @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from 
BundleActionBean a where a.bundleId  IN (:bundleId)")})
 public class BundleActionBean implements Writable, JsonBean {
 
     @Id

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java 
b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 4dbffc3..e45791e 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -67,7 +67,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "UPDATE_BUNDLE_JOB_PAUSE_KICKOFF", query = "update 
BundleJobBean w set w.kickoffTimestamp = :kickoffTime, w.pauseTimestamp = 
:pauseTime where w.id = :id"),
 
-        @NamedQuery(name = "DELETE_BUNDLE_JOB", query = "delete from 
BundleJobBean w where w.id = :id"),
+        @NamedQuery(name = "DELETE_BUNDLE_JOB", query = "delete from 
BundleJobBean w where w.id IN (:id)"),
 
         @NamedQuery(name = "GET_BUNDLE_JOBS", query = "select OBJECT(w) from 
BundleJobBean w"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 759e643..188b70e 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -18,13 +18,17 @@
 
 package org.apache.oozie;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.text.MessageFormat;
-import java.util.Date;
-import java.util.List;
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.JsonUtils;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.WritableUtils;
+import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 
 import javax.persistence.Basic;
 import javax.persistence.Column;
@@ -38,18 +42,13 @@ import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
 import javax.persistence.SqlResultSetMapping;
 import javax.persistence.Table;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.client.rest.JsonTags;
-import org.apache.oozie.client.rest.JsonUtils;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.WritableUtils;
-import org.apache.openjpa.persistence.jdbc.Index;
-import org.apache.openjpa.persistence.jdbc.Strategy;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.List;
 
 @SqlResultSetMapping(
         name = "CoordActionJobIdLmt",
@@ -79,12 +78,12 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = 
"delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 
'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"),
 
-        @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete 
from CoordinatorActionBean a where a.jobId = :jobId"),
-
-        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", 
query = "delete from CoordinatorActionBean a where a.id = :actionId"),
+        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", 
query = "delete from CoordinatorActionBean a where a.id IN (:actionId)"),
 
         @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from 
CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR 
a.statusStr = 'READY')"),
 
+        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = 
"select a.id from CoordinatorActionBean a where a.jobId = :jobId"),
+
         // Query used by XTestcase to setup tables
         @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from 
CoordinatorActionBean w"),
         // Select query used only by test cases

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java 
b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 2362084..4d6b970 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -18,27 +18,6 @@
 
 package org.apache.oozie;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.Lob;
-import javax.persistence.NamedNativeQueries;
-import javax.persistence.NamedNativeQuery;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.persistence.Transient;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -52,6 +31,26 @@ import org.apache.openjpa.persistence.jdbc.Strategy;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedNativeQueries;
+import javax.persistence.NamedNativeQuery;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
 @Entity
 @NamedQueries( {
         @NamedQuery(name = "UPDATE_COORD_JOB", query = "update 
CoordinatorJobBean w set w.appName = :appName, w.appPath = 
:appPath,w.concurrency = :concurrency, w.conf = :conf, w.externalId = 
:externalId, w.frequency = :frequency, w.lastActionNumber = :lastActionNumber, 
w.timeOut = :timeOut, w.timeZone = :timeZone, w.createdTimestamp = 
:createdTime, w.endTimestamp = :endTime, w.execution = :execution, w.jobXml = 
:jobXml, w.lastActionTimestamp = :lastAction, w.lastModifiedTimestamp = 
:lastModifiedTime, w.nextMaterializedTimestamp = :nextMaterializedTime, 
w.origJobXml = :origJobXml, w.slaXml=:slaXml, w.startTimestamp = :startTime, 
w.statusStr = :status, w.timeUnitStr = :timeUnit, w.appNamespace = 
:appNamespace, w.bundleId = :bundleId, w.matThrottling = :matThrottling  where 
w.id = :id"),
@@ -80,7 +79,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "UPDATE_COORD_JOB_CHANGE", query = "update 
CoordinatorJobBean w set w.endTimestamp = :endTime, w.statusStr = :status, 
w.pending = :pending, w.doneMaterialization = :doneMaterialization, 
w.concurrency = :concurrency, w.pauseTimestamp = :pauseTime, w.lastActionNumber 
= :lastActionNumber, w.lastActionTimestamp = :lastActionTime, 
w.nextMaterializedTimestamp = :nextMatdTime, w.lastModifiedTimestamp = 
:lastModifiedTime where w.id = :id"),
 
-        @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from 
CoordinatorJobBean w where w.id = :id"),
+        @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from 
CoordinatorJobBean w where w.id IN (:id)"),
 
         @NamedQuery(name = "GET_COORD_JOBS", query = "select OBJECT(w) from 
CoordinatorJobBean w"),
 
@@ -222,6 +221,7 @@ public class CoordinatorJobBean implements Writable, 
CoordinatorJob, JsonBean {
     private java.sql.Timestamp startTimestamp = null;
 
     @Basic
+    @Index
     @Column(name = "end_time")
     private java.sql.Timestamp endTimestamp = null;
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 00ce99d..d27f59b 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -76,9 +76,9 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "UPDATE_ACTION_PENDING_TRANS_ERROR", query = "update 
WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = 
:pendingAge, a.transition = :transition, a.errorCode = :errorCode, 
a.errorMessage = :errorMessage where a.id = :id"),
 
-    @NamedQuery(name = "DELETE_ACTION", query = "delete from 
WorkflowActionBean a where a.id = :id"),
+    @NamedQuery(name = "DELETE_ACTION", query = "delete from 
WorkflowActionBean a where a.id IN (:id)"),
 
-    @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from 
WorkflowActionBean a where a.wfId = :wfId"),
+    @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from 
WorkflowActionBean a where a.wfId IN (:wfId)"),
 
     @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from 
WorkflowActionBean a"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index 66a0f61..ef1f452 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -18,41 +18,38 @@
 
 package org.apache.oozie;
 
-import org.apache.oozie.workflow.WorkflowInstance;
-import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.JsonUtils;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.DataOutput;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.openjpa.persistence.jdbc.Index;
+import org.apache.openjpa.persistence.jdbc.Strategy;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 
-import javax.persistence.Entity;
+import javax.persistence.Basic;
 import javax.persistence.Column;
+import javax.persistence.Entity;
 import javax.persistence.Id;
+import javax.persistence.Lob;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
-import javax.persistence.Basic;
-import javax.persistence.Lob;
 import javax.persistence.Table;
 import javax.persistence.Transient;
-
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.sql.Timestamp;
-
-import org.apache.openjpa.persistence.jdbc.Index;
-import org.apache.openjpa.persistence.jdbc.Strategy;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 
 @Entity
 
@@ -74,7 +71,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update 
WorkflowJobBean w set w.appName = :appName, w.protoActionConf = 
:protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, 
w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = 
:endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, 
w.lastModifiedTimestamp = :lastModTime where w.id = :id"),
 
-    @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean 
w where w.id = :id"),
+    @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean 
w where w.id IN (:id)"),
 
     @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from 
WorkflowJobBean w order by w.startTimestamp desc"),
 
@@ -122,9 +119,9 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select 
w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting 
parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 
0000004-130709155224435-oozie-rkan-C%")
 
-    @NamedQuery(name = 
"GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = 
"select count(w) from WorkflowJobBean w where w.parentId = :parentId and 
(w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' 
OR w.endTimestamp >= :endTime)"),
+    @NamedQuery(name = "GET_WORKFLOWS_BASIC_INFO_BY_PARENT_ID", query = 
"select w.id, w.statusStr, w.endTimestamp from WorkflowJobBean w where 
w.parentId = :parentId"),
 
-    @NamedQuery(name = 
"GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select 
count(w) from WorkflowJobBean w where w.parentId like :parentId and 
(w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' 
OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure 
to append a '%' (percent symbol) at the end (e.g. 
0000004-130709155224435-oozie-rkan-C%")
+    @NamedQuery(name = "GET_WORKFLOWS_BASIC_INFO_BY_COORD_PARENT_ID", query = 
"select w.id,  w.statusStr, w.endTimestamp from WorkflowJobBean w where 
w.parentId like :parentId"),
 
     @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from 
WorkflowJobBean w where w.id = :id"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index 53a1e7a..ab06fdf 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -18,33 +18,33 @@
 
 package org.apache.oozie.command;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import 
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.eclipse.jgit.util.StringUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 /**
  * This class is used to purge workflows, coordinators, and bundles.  It takes 
into account the relationships between workflows and
  * coordinators, and coordinators and bundles.  It also only acts on 'limit' 
number of items at a time to not overtax the DB and in
@@ -65,6 +65,7 @@ public class PurgeXCommand extends XCommand<Void> {
     private int coordDel;
     private int coordActionDel;
     private int bundleDel;
+    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
 
     public PurgeXCommand(int wfOlderThan, int coordOlderThan, int 
bundleOlderThan, int limit) {
         this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
@@ -196,9 +197,6 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     private void processWorkflows(List<String> wfs) throws 
JPAExecutorException {
         List<String> wfsToPurge = processWorkflowsHelper(wfs);
-        for (String id: wfsToPurge) {
-            LOG.debug("Purging workflow " + id);
-        }
         purgeWorkflows(wfsToPurge);
     }
 
@@ -217,20 +215,21 @@ public class PurgeXCommand extends XCommand<Void> {
         List<String> subwfs = new ArrayList<String>();
         List<String> wfsToPurge = new ArrayList<String>();
         for (String wfId : wfs) {
-            // We only purge the workflow and its children if they are all 
ready to be purged
-            long numChildrenNotReady = jpaService.execute(
-                    new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId));
-            if (numChildrenNotReady == 0) {
-                wfsToPurge.add(wfId);
-                // Get all of the direct children for this workflow
-                List<String> children = new ArrayList<String>();
-                int size;
-                do {
-                    size = children.size();
-                    children.addAll(jpaService.execute(
-                            new 
WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit)));
-                } while (size != children.size());
+            int size;
+            List<WorkflowJobBean> swfBeanList = new 
ArrayList<WorkflowJobBean>();
+            do {
+                size = swfBeanList.size();
+                swfBeanList.addAll(jpaService.execute(
+                        new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), 
limit)));
+            } while (size != swfBeanList.size());
+
+            // Checking if sub workflow is ready to purge
+            List<String> children = fetchTerminatedWorkflow(swfBeanList);
+
+            // if all sub workflow ready to purge add them all and add current 
workflow
+            if(children.size() == swfBeanList.size()) {
                 subwfs.addAll(children);
+                wfsToPurge.add(wfId);
             }
         }
         // Recurse on the children we just found to process their children
@@ -239,6 +238,22 @@ public class PurgeXCommand extends XCommand<Void> {
     }
 
     /**
+     * This method will return all terminate workflow ids from wfBeanlist for 
purge.
+     * @param wfBeanList
+     * @return workflows to purge
+     */
+    private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> 
wfBeanList) {
+        List<String> children = new ArrayList<String>();
+        long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * 
DAY_IN_MS);
+        for (WorkflowJobBean wfjBean : wfBeanList) {
+            if (wfjBean.inTerminalState() && wfjBean.getEndTime().getTime() < 
wfOlderThanMS) {
+                children.add(wfjBean.getId());
+            }
+        }
+        return children;
+    }
+
+    /**
      * Process coordinators to purge them and their children.
      *
      * @param coords List of coordinators to process
@@ -246,27 +261,40 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     private void processCoordinators(List<String> coords) throws 
JPAExecutorException {
         List<String> wfsToPurge = new ArrayList<String>();
+        List<String> actionsToPurge = new ArrayList<String>();
         List<String> coordsToPurge = new ArrayList<String>();
         for (String coordId : coords) {
-            // We only purge the coord and its children if they are all ready 
to be purged
-            long numChildrenNotReady = jpaService.execute(
-                    new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId));
-            if (numChildrenNotReady == 0) {
-                coordsToPurge.add(coordId);
+            // Get all of the direct workflowChildren for this coord
+            List<WorkflowJobBean> wfjBeanList = new 
ArrayList<WorkflowJobBean>();
+            int size;
+            do {
+                size = wfjBeanList.size();
+                wfjBeanList.addAll(jpaService.execute(
+                        new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), 
limit)));
+            } while (size != wfjBeanList.size());
+
+            // Checking if workflow is ready to purge
+            List<String> workflowChildren = 
fetchTerminatedWorkflow(wfjBeanList);
+
+            // if all workflow are ready to purge add them and add the 
coordinator and their actions
+            if(workflowChildren.size() == wfjBeanList.size()) {
                 LOG.debug("Purging coordinator " + coordId);
-                // Get all of the direct children for this coord
-                List<String> children = new ArrayList<String>();
-                int size;
+                wfsToPurge.addAll(workflowChildren);
+                coordsToPurge.add(coordId);
+                // Get all of the direct actionChildren for this coord
+                List<String> actionChildren = new ArrayList<String>();
                 do {
-                    size = children.size();
-                    children.addAll(jpaService.execute(
-                            new 
WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit)));
-                } while (size != children.size());
-                wfsToPurge.addAll(children);
+                    size = actionChildren.size();
+                    actionChildren.addAll(jpaService.execute(
+                            new 
CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), 
limit)));
+                } while (size != actionChildren.size());
+                actionsToPurge.addAll(actionChildren);
             }
         }
-        // Process the children
+        // Process the children workflow
         processWorkflows(wfsToPurge);
+        // Process the children action
+        purgeCoordActions(actionsToPurge);
         // Now that all children have been purged, we can purge the 
coordinators
         purgeCoordinators(coordsToPurge);
     }
@@ -313,10 +341,13 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
         wfDel += wfs.size();
+        //To delete sub-workflows before deleting parent workflows
         Collections.reverse(wfs);
         for (int startIndex = 0; startIndex < wfs.size(); ) {
             int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + 
limit) : wfs.size();
-            jpaService.execute(new 
WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex)));
+            List<String> wfsForDelete = wfs.subList(startIndex, endIndex);
+            LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, 
","));
+            jpaService.execute(new 
WorkflowJobsDeleteJPAExecutor(wfsForDelete));
             startIndex = endIndex;
         }
     }
@@ -331,7 +362,9 @@ public class PurgeXCommand extends XCommand<Void> {
         coordActionDel = coordActions.size();
         for (int startIndex = 0; startIndex < coordActions.size(); ) {
             int endIndex = (startIndex + limit < coordActions.size()) ? 
(startIndex + limit) : coordActions.size();
-            jpaService.execute(new 
CoordActionsDeleteJPAExecutor(coordActions.subList(startIndex, endIndex)));
+            List<String> coordActionsForDelete = 
coordActions.subList(startIndex, endIndex);
+            LOG.debug("Deleting coordinator actions: " + 
StringUtils.join(coordActionsForDelete, ","));
+            jpaService.execute(new 
CoordActionsDeleteJPAExecutor(coordActionsForDelete));
             startIndex = endIndex;
         }
     }
@@ -345,7 +378,9 @@ public class PurgeXCommand extends XCommand<Void> {
         coordDel += coords.size();
         for (int startIndex = 0; startIndex < coords.size(); ) {
             int endIndex = (startIndex + limit < coords.size()) ? (startIndex 
+ limit) : coords.size();
-            jpaService.execute(new 
CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex)));
+            List<String> coordsForDelete = coords.subList(startIndex, 
endIndex);
+            LOG.debug("Deleting coordinators: " + 
StringUtils.join(coordsForDelete, ","));
+            jpaService.execute(new 
CoordJobsDeleteJPAExecutor(coordsForDelete));
             startIndex = endIndex;
         }
     }
@@ -360,7 +395,9 @@ public class PurgeXCommand extends XCommand<Void> {
         bundleDel += bundles.size();
         for (int startIndex = 0; startIndex < bundles.size(); ) {
             int endIndex = (startIndex + limit < bundles.size()) ? (startIndex 
+ limit) : bundles.size();
-            jpaService.execute(new 
BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex)));
+            Collection<String> bundlesForDelete = bundles.subList(startIndex, 
endIndex);
+            LOG.debug("Deleting bundles: " + 
StringUtils.join(bundlesForDelete, ","));
+            jpaService.execute(new 
BundleJobsDeleteJPAExecutor(bundlesForDelete));
             startIndex = endIndex;
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java
index 9ca066c..9f173e0 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java
@@ -18,13 +18,12 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.Collection;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
-import org.apache.oozie.util.ParamChecker;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Collection;
 
 /**
  * Delete Bundle job, its list of actions and return the number of actions 
that were deleted.
@@ -75,18 +74,14 @@ public class BundleJobsDeleteJPAExecutor implements 
JPAExecutor<Integer> {
         try {
             // Only used by test cases to check for rollback of transaction
             
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
-            if (deleteList != null) {
-                for (String id : deleteList) {
-                    ParamChecker.notNull(id, "Bundle Job Id");
-                    // Delete the coord job
-                    Query q = em.createNamedQuery("DELETE_BUNDLE_JOB");
-                    q.setParameter("id", id);
-                    q.executeUpdate();
-                    // Delete the actions for this coord job
-                    Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_BUNDLE");
-                    g.setParameter("bundleId", id);
-                    actionsDeleted = g.executeUpdate();
-                }
+            if (deleteList != null && !deleteList.isEmpty()) {
+                Query q = em.createNamedQuery("DELETE_BUNDLE_JOB");
+                q.setParameter("id", deleteList);
+                q.executeUpdate();
+                // Delete the actions for this coord job
+                Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_BUNDLE");
+                g.setParameter("bundleId", deleteList);
+                actionsDeleted = g.executeUpdate();
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
index 703ebce..8dd20cc 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
@@ -18,13 +18,12 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.Collection;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
-import org.apache.oozie.util.ParamChecker;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Collection;
 /**
  * Delete Coord actions of long running coordinators, return the number of 
actions that were deleted.
  */
@@ -73,16 +72,11 @@ public class CoordActionsDeleteJPAExecutor implements 
JPAExecutor<Integer> {
         try {
             // Only used by test cases to check for rollback of transaction
             
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
-            if (deleteList != null) {
-                for (String id : deleteList) {
-                    ParamChecker.notNull(id, "Coordinator Action Id");
-
-                    // Delete coordAction
-                    Query g = 
em.createNamedQuery("DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR");
-                    g.setParameter("actionId", id);
-                    g.executeUpdate();
-                    actionsDeleted++;
-                }
+            if (deleteList != null && !deleteList.isEmpty()) {
+                // Delete coordActions
+                Query g = 
em.createNamedQuery("DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR");
+                g.setParameter("actionId", deleteList);
+                actionsDeleted = g.executeUpdate();
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java
new file mode 100644
index 0000000..fa9a6fe
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.ErrorCode;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.List;
+
+public class CoordActionsGetFromCoordJobIdJPAExecutor implements 
JPAExecutor<List<String>> {
+
+    private String coordId;
+    private int limit;
+    private int offset;
+
+    public CoordActionsGetFromCoordJobIdJPAExecutor(String coordId, int 
offset, int limit) {
+        this.coordId = coordId;
+        this.offset = offset;
+        this.limit = limit;
+    }
+
+    @Override
+    public String getName() {
+        return "CoordActionsGetFromCoordJobIdJPAExecutor";
+    }
+
+    @Override
+    public List<String> execute(EntityManager em) throws JPAExecutorException {
+        List<String> actions = null;
+        try {
+            Query jobQ = 
em.createNamedQuery("GET_COORD_ACTIONS_FOR_COORDINATOR");
+            jobQ.setParameter("jobId", coordId);
+            jobQ.setMaxResults(limit);
+            jobQ.setFirstResult(offset);
+            actions = jobQ.getResultList();
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return actions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java
index 2836a44..cbebef3 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java
@@ -18,13 +18,12 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.Collection;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
-import org.apache.oozie.util.ParamChecker;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Collection;
 
 /**
  * Delete Coord job, its list of actions and return the number of actions that 
were deleted.
@@ -75,18 +74,11 @@ public class CoordJobsDeleteJPAExecutor implements 
JPAExecutor<Integer> {
         try {
             // Only used by test cases to check for rollback of transaction
             
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
-            if (deleteList != null) {
-                for (String id : deleteList) {
-                    ParamChecker.notNull(id, "Coordinator Job Id");
-                    // Delete the coord job
-                    Query q = em.createNamedQuery("DELETE_COORD_JOB");
-                    q.setParameter("id", id);
-                    q.executeUpdate();
-                    // Delete the actions for this coord job
-                    Query g = 
em.createNamedQuery("DELETE_ACTIONS_FOR_COORDINATOR");
-                    g.setParameter("jobId", id);
-                    actionsDeleted = g.executeUpdate();
-                }
+            if (deleteList != null && !deleteList.isEmpty()) {
+                // Delete the coord job list
+                Query q = em.createNamedQuery("DELETE_COORD_JOB");
+                q.setParameter("id", deleteList);
+                actionsDeleted = q.executeUpdate();
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
new file mode 100644
index 0000000..2e1ab5f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.util.DateUtils;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+public class WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor  implements 
JPAExecutor<List<WorkflowJobBean>> {
+    private String parentId;
+    private int limit;
+    private int offset;
+
+    public WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(String parentId, 
int limit) {
+        this(parentId, 0, limit);
+    }
+
+    public WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(String parentId, 
int offset, int limit) {
+        this.parentId = parentId;
+        this.offset = offset;
+        this.limit = limit;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<WorkflowJobBean> execute(EntityManager em) throws 
JPAExecutorException {
+        try {
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_BASIC_INFO_BY_COORD_PARENT_ID");
+            jobQ.setParameter("parentId", parentId + "%");  // The '%' is the 
wildcard
+            jobQ.setMaxResults(limit);
+            jobQ.setFirstResult(offset);
+            return getBeanFromArray(jobQ.getResultList());
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    private List<WorkflowJobBean> getBeanFromArray(List resultList) {
+        List<WorkflowJobBean> wfActionBeanList = new 
ArrayList<WorkflowJobBean>();
+        for (Object element : resultList) {
+            WorkflowJobBean wfBean = new WorkflowJobBean();
+            Object[] arr = (Object[])element;
+            if(arr[0] != null) {
+                wfBean.setId((String) arr[0]);
+            }
+            if(arr[1] != null) {
+                wfBean.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
+            }
+            if(arr[2] != null) {
+                wfBean.setEndTime(DateUtils.toDate((Timestamp) arr[2]));
+            }
+            wfActionBeanList.add(wfBean);
+        }
+        return wfActionBeanList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
new file mode 100644
index 0000000..8205334
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.util.DateUtils;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+public class WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor implements 
JPAExecutor<List<WorkflowJobBean>> {
+
+    private String parentId;
+    private int limit;
+    private int offset;
+
+    public WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(String 
parentId, int limit) {
+        this(parentId, 0, limit);
+    }
+
+    public WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(String 
parentId, int offset, int limit) {
+        this.parentId = parentId;
+        this.offset = offset;
+        this.limit = limit;
+    }
+
+    @Override
+    public String getName() {
+        return "WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor";
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<WorkflowJobBean> execute(EntityManager em) throws 
JPAExecutorException {
+        try {
+            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_BASIC_INFO_BY_PARENT_ID");
+            jobQ.setParameter("parentId", parentId);
+            jobQ.setMaxResults(limit);
+            jobQ.setFirstResult(offset);
+            return getBeanFromArray(jobQ.getResultList());
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+    }
+
+    private List<WorkflowJobBean> getBeanFromArray(List resultList) {
+        List<WorkflowJobBean> wfActionBeanList = new 
ArrayList<WorkflowJobBean>();
+        for (Object element : resultList) {
+            WorkflowJobBean wfBean = new WorkflowJobBean();
+            Object[] arr = (Object[])element;
+            if(arr[0] != null) {
+                wfBean.setId((String) arr[0]);
+            }
+            if(arr[1] != null) {
+                wfBean.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
+            }
+            if(arr[2] != null) {
+                wfBean.setEndTime(DateUtils.toDate((Timestamp) arr[2]));
+            }
+            wfActionBeanList.add(wfBean);
+        }
+
+        return wfActionBeanList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
deleted file mode 100644
index 4eca836..0000000
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-
-/**
- * Count the number of Workflow children of a parent Coordinator that are not 
ready to be purged
- */
-public class WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor 
implements JPAExecutor<Long> {
-
-    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
-    private long olderThanDays;
-    private String parentId;
-
-    public WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(long 
olderThanDays, String parentId) {
-        this.olderThanDays = olderThanDays;
-        this.parentId = parentId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Long execute(EntityManager em) throws JPAExecutorException {
-        Long count = 0L;
-        try {
-            Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - 
(olderThanDays * DAY_IN_MS));
-            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE");
-            jobQ.setParameter("parentId", parentId + "%");  // The '%' is the 
wildcard
-            jobQ.setParameter("endTime", maxEndTime);
-            count = (Long) jobQ.getSingleResult();
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-        return count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
deleted file mode 100644
index 42356d2..0000000
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.ErrorCode;
-
-/**
- * Count the number of Workflow children of a parent Workflow that are not 
ready to be purged
- */
-public class WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor 
implements JPAExecutor<Long> {
-
-    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
-    private long olderThanDays;
-    private String parentId;
-
-    public WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(long 
olderThanDays, String parentId) {
-        this.olderThanDays = olderThanDays;
-        this.parentId = parentId;
-    }
-
-    @Override
-    public String getName() {
-        return "WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Long execute(EntityManager em) throws JPAExecutorException {
-        Long count = 0L;
-        try {
-            Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - 
(olderThanDays * DAY_IN_MS));
-            Query jobQ = 
em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE");
-            jobQ.setParameter("parentId", parentId);
-            jobQ.setParameter("endTime", maxEndTime);
-            count = (Long) jobQ.getSingleResult();
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-        return count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java
index f18e073..8674aa2 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java
@@ -18,13 +18,12 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.Collection;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.FaultInjection;
-import org.apache.oozie.util.ParamChecker;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Collection;
 
 /**
  * Delete WF job, its list of actions and return the number of actions that 
were deleted.
@@ -75,18 +74,15 @@ public class WorkflowJobsDeleteJPAExecutor implements 
JPAExecutor<Integer> {
         try {
             // Only used by test cases to check for rollback of transaction
             
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
-            if (deleteList != null) {
-                for (String id : deleteList) {
-                    ParamChecker.notNull(id, "Workflow Job Id");
-                    // Delete the WF job
-                    Query q = em.createNamedQuery("DELETE_WORKFLOW");
-                    q.setParameter("id", id);
-                    q.executeUpdate();
-                    // Delete the actions for this WF job
-                    Query g = 
em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
-                    g.setParameter("wfId", id);
-                    actionsDeleted = g.executeUpdate();
-                }
+            if (deleteList != null && !deleteList.isEmpty()) {
+                // Delete the WF job
+                Query q = em.createNamedQuery("DELETE_WORKFLOW");
+                q.setParameter("id", deleteList);
+                q.executeUpdate();
+                // Delete the actions for this WF job
+                Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
+                g.setParameter("wfId", deleteList);
+                actionsDeleted = g.executeUpdate();
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
index 6e07d03..bff5836 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java
@@ -18,11 +18,6 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.util.ArrayList;
-import java.util.List;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
-
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
@@ -34,6 +29,9 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase {
     Services services;
     private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
@@ -76,11 +74,20 @@ public class TestCoordJobsDeleteJPAExecutor extends 
XDataTestCase {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
 
-        List<String> deleteList = new ArrayList<String>();
-        deleteList.add(jobA.getId());
-        deleteList.add(jobB.getId());
-        deleteList.add(jobC.getId());
-        jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+        List<String> deleteCoordlist = new ArrayList<String>();
+        deleteCoordlist.add(jobA.getId());
+        deleteCoordlist.add(jobB.getId());
+        deleteCoordlist.add(jobC.getId());
+        jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteCoordlist));
+
+        List<String> deleteActionList = new ArrayList<String>();
+        deleteActionList.add(actionA1.getId());
+        deleteActionList.add(actionB1.getId());
+        deleteActionList.add(actionC1.getId());
+        deleteActionList.add(actionA2.getId());
+        deleteActionList.add(actionB2.getId());
+        deleteActionList.add(actionC2.getId());
+        jpaService.execute(new 
CoordActionsDeleteJPAExecutor(deleteActionList));
 
         try {
             jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId()));
@@ -182,13 +189,22 @@ public class TestCoordJobsDeleteJPAExecutor extends 
XDataTestCase {
             setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
             
setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, 
"true");
 
-            List<String> deleteList = new ArrayList<String>();
-            deleteList.add(jobA.getId());
-            deleteList.add(jobB.getId());
-            deleteList.add(jobC.getId());
+            List<String> deleteCoordlist = new ArrayList<String>();
+            deleteCoordlist.add(jobA.getId());
+            deleteCoordlist.add(jobB.getId());
+            deleteCoordlist.add(jobC.getId());
+
+            List<String> deleteActionList = new ArrayList<String>();
+            deleteActionList.add(actionA1.getId());
+            deleteActionList.add(actionB1.getId());
+            deleteActionList.add(actionC1.getId());
+            deleteActionList.add(actionA2.getId());
+            deleteActionList.add(actionB2.getId());
+            deleteActionList.add(actionC2.getId());
 
             try {
-                jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList));
+                jpaService.execute(new 
CoordJobsDeleteJPAExecutor(deleteCoordlist));
+                jpaService.execute(new 
CoordActionsDeleteJPAExecutor(deleteActionList));
                 fail("Should have skipped commit for failover testing");
             }
             catch (RuntimeException re) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
new file mode 100644
index 0000000..e2f8b9a
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+public class TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor extends 
XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetCoordinatorParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        HashMap<String,WorkflowJobBean> wflist = new HashMap<String, 
WorkflowJobBean>();
+        CoordinatorJobBean coordJobA = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        CoordinatorJobBean coordJobB = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJobA1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobA2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+
+        List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJobA.getId(), 10)));
+        wflist.put(wfJobA1.getId(), wfJobA1);
+        wflist.put(wfJobA2.getId(), wfJobA2);
+        checkChildren(children, wflist);
+
+        children = new ArrayList<WorkflowJobBean>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJobB.getId(), 10)));
+        wflist.clear();
+        wflist.put(wfJobB.getId(), wfJobB);
+        checkChildren(children, wflist);
+    }
+
+    public void testGetWorkflowParentTooMany() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        HashMap<String,WorkflowJobBean> wflist = new HashMap<String, 
WorkflowJobBean>();
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        WorkflowJobBean wfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+        WorkflowJobBean wfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId());
+
+        List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>();
+        // Get the first 3
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJob.getId(), 3)));
+        assertEquals(3, children.size());
+        // Get the next 3 (though there's only 2 more)
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJob.getId(), 3, 3)));
+        assertEquals(5, children.size());
+        wflist.put(wfJob1.getId(), wfJob1);
+        wflist.put(wfJob2.getId(), wfJob2);
+        wflist.put(wfJob3.getId(), wfJob3);
+        wflist.put(wfJob4.getId(), wfJob4);
+        wflist.put(wfJob5.getId(), wfJob5);
+        checkChildren(children, wflist);
+    }
+
+    private void checkChildren(List<WorkflowJobBean> children, 
HashMap<String,WorkflowJobBean> wfJobBaselist) {
+        assertEquals(wfJobBaselist.size(), children.size());
+        for (int i = 0; i < children.size(); i++) {
+            WorkflowJobBean wfJobBase = 
wfJobBaselist.get(children.get(i).getId());
+            assertNotNull(wfJobBase);
+            assertEquals(wfJobBase.getStatus(),children.get(i).getStatus());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
new file mode 100644
index 0000000..e275520
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor extends 
XDataTestCase {
+    Services services;
+    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testGetWorkflowParent() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        HashMap<String,WorkflowJobBean> wflist = new HashMap<String, 
WorkflowJobBean>();
+
+        WorkflowJobBean wfJobA = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean wfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean subwfJobA1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobA.getId());
+        WorkflowJobBean subwfJobA2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobA.getId());
+        WorkflowJobBean subwfJobB = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJobB.getId());
+
+        List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJobA.getId(), 10)));
+        wflist.put(subwfJobA1.getId(), subwfJobA1);
+        wflist.put(subwfJobA2.getId(),subwfJobA2);
+        checkChildren(children, wflist);
+
+        children = new ArrayList<WorkflowJobBean>();
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJobB.getId(), 10)));
+        wflist.clear();
+        wflist.put(subwfJobB.getId(),subwfJobB);
+        checkChildren(children, wflist);
+    }
+
+    public void testGetCoordinatorParentTooMany() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        HashMap<String,WorkflowJobBean> wflist = new HashMap<String, 
WorkflowJobBean>();
+        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
+        WorkflowJobBean subwfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+        WorkflowJobBean subwfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED,
+                wfJob.getId());
+
+        List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>();
+        // Get the first 3
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3)));
+        assertEquals(3, children.size());
+        // Get the next 3 (though there's only 2 more)
+        children.addAll(jpaService.execute(new 
WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3, 3)));
+        assertEquals(5, children.size());
+        wflist.put(subwfJob1.getId(), subwfJob1);
+        wflist.put(subwfJob2.getId(), subwfJob2);
+        wflist.put(subwfJob3.getId(), subwfJob3);
+        wflist.put(subwfJob4.getId(), subwfJob4);
+        wflist.put(subwfJob5.getId(), subwfJob5);
+
+        checkChildren(children, wflist);
+    }
+
+    private void checkChildren(List<WorkflowJobBean> children, 
HashMap<String,WorkflowJobBean> wfJobBaselist) {
+        assertEquals(wfJobBaselist.size(), children.size());
+        for (int i = 0; i < children.size(); i++) {
+            WorkflowJobBean wfJobBase = 
wfJobBaselist.get(children.get(i).getId());
+            assertNotNull(wfJobBase);
+            assertEquals(wfJobBase.getStatus(),children.get(i).getStatus());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
deleted file mode 100644
index 949c2ae..0000000
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.TestPurgeXCommand;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-
-public class TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor 
extends XDataTestCase {
-    Services services;
-    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
-            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
-            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        setClassesToBeExcluded(services.getConf(), excludedServices);
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testCount() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
-        String coordJobId = coordJob.getId();
-        int days = 1;
-        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED, coordJobId);
-        wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z");
-        CoordinatorActionBean coordAction1 = 
addRecordToCoordActionTable(coordJobId, 1, CoordinatorAction.Status.SUCCEEDED,
-                "coord-action-get.xml", wfJob1.getId(), wfJob1.getStatusStr(), 
0);
-        days = 1;
-        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
-        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED, coordJobId);
-        wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z");
-        CoordinatorActionBean coordAction2 = 
addRecordToCoordActionTable(coordJobId, 2, CoordinatorAction.Status.FAILED,
-                "coord-action-get.xml", wfJob2.getId(), wfJob2.getStatusStr(), 
0);
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime());
-        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
-        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.KILLED, 
WorkflowInstance.Status.KILLED, coordJobId);
-        wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z");
-        CoordinatorActionBean coordAction3 = 
addRecordToCoordActionTable(coordJobId, 3, CoordinatorAction.Status.KILLED,
-                "coord-action-get.xml", wfJob3.getId(), wfJob3.getStatusStr(), 
0);
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime());
-        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
-        assertEquals(3, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, 
coordJobId);
-        wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z");
-        CoordinatorActionBean coordAction4 = 
addRecordToCoordActionTable(coordJobId, 4, CoordinatorAction.Status.RUNNING,
-                "coord-action-get.xml", wfJob4.getId(), wfJob4.getStatusStr(), 
0);
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime());
-        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
-        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING, coordJobId);
-        wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z");
-        CoordinatorActionBean coordAction5 = 
addRecordToCoordActionTable(coordJobId, 5, CoordinatorAction.Status.RUNNING,
-                "coord-action-get.xml", wfJob5.getId(), wfJob5.getStatusStr(), 
0);
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime());
-        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
-        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-
-        WorkflowJobBean wfJob6 = 
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, 
WorkflowInstance.Status.SUSPENDED, coordJobId);
-        wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z");
-        CoordinatorActionBean coordAction6 = 
addRecordToCoordActionTable(coordJobId, 6, CoordinatorAction.Status.SUSPENDED,
-                "coord-action-get.xml", wfJob6.getId(), wfJob6.getStatusStr(), 
0);
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime());
-        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-        days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime());
-        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
deleted file mode 100644
index 2004503..0000000
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.executor.jpa;
-
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.TestPurgeXCommand;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.workflow.WorkflowInstance;
-
-public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor 
extends XDataTestCase {
-    Services services;
-    private String[] excludedServices = { 
"org.apache.oozie.service.StatusTransitService",
-            "org.apache.oozie.service.PauseTransitService", 
"org.apache.oozie.service.PurgeService",
-            "org.apache.oozie.service.CoordMaterializeTriggerService", 
"org.apache.oozie.service.RecoveryService" };
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        setClassesToBeExcluded(services.getConf(), excludedServices);
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testCount() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-
-        WorkflowJobBean wfJob = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED);
-        String wfJobId = wfJob.getId();
-        int days = 1;
-        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob1 = 
addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, 
WorkflowInstance.Status.SUCCEEDED, wfJobId);
-        subwfJob1 = TestPurgeXCommand.setEndTime(subwfJob1, 
"2009-12-01T01:00Z");
-        days = 1;
-        assertEquals(0, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
-        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob2 = 
addRecordToWfJobTable(WorkflowJob.Status.FAILED, 
WorkflowInstance.Status.FAILED, wfJobId);
-        subwfJob2 = TestPurgeXCommand.setEndTime(subwfJob2, 
"2009-11-01T01:00Z");
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime());
-        assertEquals(1, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
-        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob3 = 
addRecordToWfJobTable(WorkflowJob.Status.KILLED, 
WorkflowInstance.Status.KILLED, wfJobId);
-        subwfJob3 = TestPurgeXCommand.setEndTime(subwfJob3, 
"2009-10-01T01:00Z");
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime());
-        assertEquals(2, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
-        assertEquals(3, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob4 = 
addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, 
wfJobId);
-        subwfJob4 = TestPurgeXCommand.setEndTime(subwfJob4, 
"2009-09-01T01:00Z");
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime());
-        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
-        assertEquals(4, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob5 = 
addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING, wfJobId);
-        subwfJob5 = TestPurgeXCommand.setEndTime(subwfJob5, 
"2009-08-01T01:00Z");
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime());
-        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
-        assertEquals(5, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-
-        WorkflowJobBean subwfJob6 = 
addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, 
WorkflowInstance.Status.SUSPENDED, wfJobId);
-        subwfJob6 = TestPurgeXCommand.setEndTime(subwfJob6, 
"2009-07-01T01:00Z");
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime());
-        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-        days = 
TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob6.getEndTime());
-        assertEquals(6, (long) jpaService.execute(new 
WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId)));
-    }
-}

Reply via email to