OOZIE-1803 Improvement in Purge service (jaydeepvishwakarma via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0d7b94af Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0d7b94af Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0d7b94af Branch: refs/heads/master Commit: 0d7b94af4999577f83a192c6b044f600141f9f6a Parents: aa7bba5 Author: Shwetha GS <[email protected]> Authored: Tue Nov 11 11:44:51 2014 +0530 Committer: Shwetha GS <[email protected]> Committed: Tue Nov 11 11:44:51 2014 +0530 ---------------------------------------------------------------------- .../java/org/apache/oozie/BundleActionBean.java | 2 +- .../java/org/apache/oozie/BundleJobBean.java | 2 +- .../org/apache/oozie/CoordinatorActionBean.java | 43 +++---- .../org/apache/oozie/CoordinatorJobBean.java | 44 +++---- .../org/apache/oozie/WorkflowActionBean.java | 4 +- .../java/org/apache/oozie/WorkflowJobBean.java | 47 ++++--- .../org/apache/oozie/command/PurgeXCommand.java | 129 ++++++++++++------- .../jpa/BundleJobsDeleteJPAExecutor.java | 29 ++--- .../jpa/CoordActionsDeleteJPAExecutor.java | 24 ++-- ...oordActionsGetFromCoordJobIdJPAExecutor.java | 59 +++++++++ .../jpa/CoordJobsDeleteJPAExecutor.java | 26 ++-- ...bsBasicInfoFromCoordParentIdJPAExecutor.java | 85 ++++++++++++ ...asicInfoFromWorkflowParentIdJPAExecutor.java | 87 +++++++++++++ ...NotForPurgeFromCoordParentIdJPAExecutor.java | 64 --------- ...ForPurgeFromWorkflowParentIdJPAExecutor.java | 64 --------- .../jpa/WorkflowJobsDeleteJPAExecutor.java | 30 ++--- .../jpa/TestCoordJobsDeleteJPAExecutor.java | 46 ++++--- ...bsBasicInfoFromCoordParentIdJPAExecutor.java | 117 +++++++++++++++++ ...asicInfoFromWorkflowParentIdJPAExecutor.java | 120 +++++++++++++++++ ...NotForPurgeFromCoordParentIdJPAExecutor.java | 116 ----------------- ...ForPurgeFromWorkflowParentIdJPAExecutor.java | 101 --------------- release-log.txt | 1 + 22 files changed, 695 insertions(+), 545 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/BundleActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleActionBean.java b/core/src/main/java/org/apache/oozie/BundleActionBean.java index 65bfe8c..56e3672 100644 --- a/core/src/main/java/org/apache/oozie/BundleActionBean.java +++ b/core/src/main/java/org/apache/oozie/BundleActionBean.java @@ -77,7 +77,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 'DONEWITHERROR')"), - @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId")}) + @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId IN (:bundleId)")}) public class BundleActionBean implements Writable, JsonBean { @Id http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/BundleJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java index 4dbffc3..e45791e 100644 --- a/core/src/main/java/org/apache/oozie/BundleJobBean.java +++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java @@ -67,7 +67,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_BUNDLE_JOB_PAUSE_KICKOFF", query = "update BundleJobBean w set w.kickoffTimestamp = :kickoffTime, w.pauseTimestamp = :pauseTime where w.id = :id"), - @NamedQuery(name = "DELETE_BUNDLE_JOB", query = "delete from BundleJobBean w where w.id = :id"), + @NamedQuery(name = "DELETE_BUNDLE_JOB", query = "delete from BundleJobBean w where w.id IN (:id)"), @NamedQuery(name = "GET_BUNDLE_JOBS", query = "select OBJECT(w) from BundleJobBean w"), http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index 759e643..188b70e 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -18,13 +18,17 @@ package org.apache.oozie; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.Timestamp; -import java.text.MessageFormat; -import java.util.Date; -import java.util.List; +import org.apache.hadoop.io.Writable; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.client.rest.JsonTags; +import org.apache.oozie.client.rest.JsonUtils; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.WritableUtils; +import org.apache.openjpa.persistence.jdbc.Index; +import org.apache.openjpa.persistence.jdbc.Strategy; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import javax.persistence.Basic; import javax.persistence.Column; @@ -38,18 +42,13 @@ import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.SqlResultSetMapping; import javax.persistence.Table; - -import org.apache.hadoop.io.Writable; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.rest.JsonBean; -import org.apache.oozie.client.rest.JsonTags; -import org.apache.oozie.client.rest.JsonUtils; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.WritableUtils; -import org.apache.openjpa.persistence.jdbc.Index; -import org.apache.openjpa.persistence.jdbc.Strategy; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Timestamp; +import java.text.MessageFormat; +import java.util.Date; +import java.util.List; @SqlResultSetMapping( name = "CoordActionJobIdLmt", @@ -79,12 +78,12 @@ import org.json.simple.JSONObject; @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"), - @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"), - - @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id = :actionId"), + @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id IN (:actionId)"), @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"), + @NamedQuery(name = "GET_COORD_ACTIONS_FOR_COORDINATOR", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId"), + // Query used by XTestcase to setup tables @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), // Select query used only by test cases http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 2362084..4d6b970 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -18,27 +18,6 @@ package org.apache.oozie; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.Timestamp; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.persistence.Basic; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Lob; -import javax.persistence.NamedNativeQueries; -import javax.persistence.NamedNativeQuery; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.Table; -import javax.persistence.Transient; - import org.apache.hadoop.io.Writable; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; @@ -52,6 +31,26 @@ import org.apache.openjpa.persistence.jdbc.Strategy; import org.json.simple.JSONArray; import org.json.simple.JSONObject; +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.NamedNativeQueries; +import javax.persistence.NamedNativeQuery; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.Transient; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Timestamp; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + @Entity @NamedQueries( { @NamedQuery(name = "UPDATE_COORD_JOB", query = "update CoordinatorJobBean w set w.appName = :appName, w.appPath = :appPath,w.concurrency = :concurrency, w.conf = :conf, w.externalId = :externalId, w.frequency = :frequency, w.lastActionNumber = :lastActionNumber, w.timeOut = :timeOut, w.timeZone = :timeZone, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.execution = :execution, w.jobXml = :jobXml, w.lastActionTimestamp = :lastAction, w.lastModifiedTimestamp = :lastModifiedTime, w.nextMaterializedTimestamp = :nextMaterializedTime, w.origJobXml = :origJobXml, w.slaXml=:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.timeUnitStr = :timeUnit, w.appNamespace = :appNamespace, w.bundleId = :bundleId, w.matThrottling = :matThrottling where w.id = :id"), @@ -80,7 +79,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_COORD_JOB_CHANGE", query = "update CoordinatorJobBean w set w.endTimestamp = :endTime, w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.concurrency = :concurrency, w.pauseTimestamp = :pauseTime, w.lastActionNumber = :lastActionNumber, w.lastActionTimestamp = :lastActionTime, w.nextMaterializedTimestamp = :nextMatdTime, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), - @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from CoordinatorJobBean w where w.id = :id"), + @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from CoordinatorJobBean w where w.id IN (:id)"), @NamedQuery(name = "GET_COORD_JOBS", query = "select OBJECT(w) from CoordinatorJobBean w"), @@ -222,6 +221,7 @@ public class CoordinatorJobBean implements Writable, CoordinatorJob, JsonBean { private java.sql.Timestamp startTimestamp = null; @Basic + @Index @Column(name = "end_time") private java.sql.Timestamp endTimestamp = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/WorkflowActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java index 00ce99d..d27f59b 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java @@ -76,9 +76,9 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_ACTION_PENDING_TRANS_ERROR", query = "update WorkflowActionBean a set a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.transition = :transition, a.errorCode = :errorCode, a.errorMessage = :errorMessage where a.id = :id"), - @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"), + @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id IN (:id)"), - @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"), + @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId IN (:wfId)"), @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"), http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/WorkflowJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java index 66a0f61..ef1f452 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java @@ -18,41 +18,38 @@ package org.apache.oozie; -import org.apache.oozie.workflow.WorkflowInstance; -import org.apache.oozie.workflow.lite.LiteWorkflowInstance; +import org.apache.hadoop.io.Writable; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonTags; import org.apache.oozie.client.rest.JsonUtils; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.WritableUtils; -import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.IOException; -import java.io.DataOutput; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import org.apache.oozie.workflow.WorkflowInstance; +import org.apache.oozie.workflow.lite.LiteWorkflowInstance; +import org.apache.openjpa.persistence.jdbc.Index; +import org.apache.openjpa.persistence.jdbc.Strategy; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; -import javax.persistence.Entity; +import javax.persistence.Basic; import javax.persistence.Column; +import javax.persistence.Entity; import javax.persistence.Id; +import javax.persistence.Lob; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; -import javax.persistence.Basic; -import javax.persistence.Lob; import javax.persistence.Table; import javax.persistence.Transient; - +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.sql.Timestamp; - -import org.apache.openjpa.persistence.jdbc.Index; -import org.apache.openjpa.persistence.jdbc.Strategy; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; @Entity @@ -74,7 +71,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update WorkflowJobBean w set w.appName = :appName, w.protoActionConf = :protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = :endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"), - @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"), + @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id IN (:id)"), @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"), @@ -122,9 +119,9 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%") - @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), + @NamedQuery(name = "GET_WORKFLOWS_BASIC_INFO_BY_PARENT_ID", query = "select w.id, w.statusStr, w.endTimestamp from WorkflowJobBean w where w.parentId = :parentId"), - @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%") + @NamedQuery(name = "GET_WORKFLOWS_BASIC_INFO_BY_COORD_PARENT_ID", query = "select w.id, w.statusStr, w.endTimestamp from WorkflowJobBean w where w.parentId like :parentId"), @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id"), http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java index 53a1e7a..ab06fdf 100644 --- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java @@ -18,33 +18,33 @@ package org.apache.oozie.command; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.XException; import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor; import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; -import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; +import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobsGetFromWorkflowParentIdJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.eclipse.jgit.util.StringUtils; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; /** * This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and * coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in @@ -65,6 +65,7 @@ public class PurgeXCommand extends XCommand<Void> { private int coordDel; private int coordActionDel; private int bundleDel; + private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) { this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false); @@ -196,9 +197,6 @@ public class PurgeXCommand extends XCommand<Void> { */ private void processWorkflows(List<String> wfs) throws JPAExecutorException { List<String> wfsToPurge = processWorkflowsHelper(wfs); - for (String id: wfsToPurge) { - LOG.debug("Purging workflow " + id); - } purgeWorkflows(wfsToPurge); } @@ -217,20 +215,21 @@ public class PurgeXCommand extends XCommand<Void> { List<String> subwfs = new ArrayList<String>(); List<String> wfsToPurge = new ArrayList<String>(); for (String wfId : wfs) { - // We only purge the workflow and its children if they are all ready to be purged - long numChildrenNotReady = jpaService.execute( - new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(wfOlderThan, wfId)); - if (numChildrenNotReady == 0) { - wfsToPurge.add(wfId); - // Get all of the direct children for this workflow - List<String> children = new ArrayList<String>(); - int size; - do { - size = children.size(); - children.addAll(jpaService.execute( - new WorkflowJobsGetFromWorkflowParentIdJPAExecutor(wfId, children.size(), limit))); - } while (size != children.size()); + int size; + List<WorkflowJobBean> swfBeanList = new ArrayList<WorkflowJobBean>(); + do { + size = swfBeanList.size(); + swfBeanList.addAll(jpaService.execute( + new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit))); + } while (size != swfBeanList.size()); + + // Checking if sub workflow is ready to purge + List<String> children = fetchTerminatedWorkflow(swfBeanList); + + // if all sub workflow ready to purge add them all and add current workflow + if(children.size() == swfBeanList.size()) { subwfs.addAll(children); + wfsToPurge.add(wfId); } } // Recurse on the children we just found to process their children @@ -239,6 +238,22 @@ public class PurgeXCommand extends XCommand<Void> { } /** + * This method will return all terminate workflow ids from wfBeanlist for purge. + * @param wfBeanList + * @return workflows to purge + */ + private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) { + List<String> children = new ArrayList<String>(); + long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS); + for (WorkflowJobBean wfjBean : wfBeanList) { + if (wfjBean.inTerminalState() && wfjBean.getEndTime().getTime() < wfOlderThanMS) { + children.add(wfjBean.getId()); + } + } + return children; + } + + /** * Process coordinators to purge them and their children. * * @param coords List of coordinators to process @@ -246,27 +261,40 @@ public class PurgeXCommand extends XCommand<Void> { */ private void processCoordinators(List<String> coords) throws JPAExecutorException { List<String> wfsToPurge = new ArrayList<String>(); + List<String> actionsToPurge = new ArrayList<String>(); List<String> coordsToPurge = new ArrayList<String>(); for (String coordId : coords) { - // We only purge the coord and its children if they are all ready to be purged - long numChildrenNotReady = jpaService.execute( - new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId)); - if (numChildrenNotReady == 0) { - coordsToPurge.add(coordId); + // Get all of the direct workflowChildren for this coord + List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>(); + int size; + do { + size = wfjBeanList.size(); + wfjBeanList.addAll(jpaService.execute( + new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit))); + } while (size != wfjBeanList.size()); + + // Checking if workflow is ready to purge + List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList); + + // if all workflow are ready to purge add them and add the coordinator and their actions + if(workflowChildren.size() == wfjBeanList.size()) { LOG.debug("Purging coordinator " + coordId); - // Get all of the direct children for this coord - List<String> children = new ArrayList<String>(); - int size; + wfsToPurge.addAll(workflowChildren); + coordsToPurge.add(coordId); + // Get all of the direct actionChildren for this coord + List<String> actionChildren = new ArrayList<String>(); do { - size = children.size(); - children.addAll(jpaService.execute( - new WorkflowJobsGetFromCoordParentIdJPAExecutor(coordId, children.size(), limit))); - } while (size != children.size()); - wfsToPurge.addAll(children); + size = actionChildren.size(); + actionChildren.addAll(jpaService.execute( + new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit))); + } while (size != actionChildren.size()); + actionsToPurge.addAll(actionChildren); } } - // Process the children + // Process the children workflow processWorkflows(wfsToPurge); + // Process the children action + purgeCoordActions(actionsToPurge); // Now that all children have been purged, we can purge the coordinators purgeCoordinators(coordsToPurge); } @@ -313,10 +341,13 @@ public class PurgeXCommand extends XCommand<Void> { */ private void purgeWorkflows(List<String> wfs) throws JPAExecutorException { wfDel += wfs.size(); + //To delete sub-workflows before deleting parent workflows Collections.reverse(wfs); for (int startIndex = 0; startIndex < wfs.size(); ) { int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size(); - jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfs.subList(startIndex, endIndex))); + List<String> wfsForDelete = wfs.subList(startIndex, endIndex); + LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ",")); + jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete)); startIndex = endIndex; } } @@ -331,7 +362,9 @@ public class PurgeXCommand extends XCommand<Void> { coordActionDel = coordActions.size(); for (int startIndex = 0; startIndex < coordActions.size(); ) { int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size(); - jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActions.subList(startIndex, endIndex))); + List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex); + LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ",")); + jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete)); startIndex = endIndex; } } @@ -345,7 +378,9 @@ public class PurgeXCommand extends XCommand<Void> { coordDel += coords.size(); for (int startIndex = 0; startIndex < coords.size(); ) { int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size(); - jpaService.execute(new CoordJobsDeleteJPAExecutor(coords.subList(startIndex, endIndex))); + List<String> coordsForDelete = coords.subList(startIndex, endIndex); + LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ",")); + jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete)); startIndex = endIndex; } } @@ -360,7 +395,9 @@ public class PurgeXCommand extends XCommand<Void> { bundleDel += bundles.size(); for (int startIndex = 0; startIndex < bundles.size(); ) { int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size(); - jpaService.execute(new BundleJobsDeleteJPAExecutor(bundles.subList(startIndex, endIndex))); + Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex); + LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ",")); + jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete)); startIndex = endIndex; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java index 9ca066c..9f173e0 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobsDeleteJPAExecutor.java @@ -18,13 +18,12 @@ package org.apache.oozie.executor.jpa; -import java.util.Collection; -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.ErrorCode; import org.apache.oozie.FaultInjection; -import org.apache.oozie.util.ParamChecker; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.Collection; /** * Delete Bundle job, its list of actions and return the number of actions that were deleted. @@ -75,18 +74,14 @@ public class BundleJobsDeleteJPAExecutor implements JPAExecutor<Integer> { try { // Only used by test cases to check for rollback of transaction FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); - if (deleteList != null) { - for (String id : deleteList) { - ParamChecker.notNull(id, "Bundle Job Id"); - // Delete the coord job - Query q = em.createNamedQuery("DELETE_BUNDLE_JOB"); - q.setParameter("id", id); - q.executeUpdate(); - // Delete the actions for this coord job - Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_BUNDLE"); - g.setParameter("bundleId", id); - actionsDeleted = g.executeUpdate(); - } + if (deleteList != null && !deleteList.isEmpty()) { + Query q = em.createNamedQuery("DELETE_BUNDLE_JOB"); + q.setParameter("id", deleteList); + q.executeUpdate(); + // Delete the actions for this coord job + Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_BUNDLE"); + g.setParameter("bundleId", deleteList); + actionsDeleted = g.executeUpdate(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java index 703ebce..8dd20cc 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java @@ -18,13 +18,12 @@ package org.apache.oozie.executor.jpa; -import java.util.Collection; -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.ErrorCode; import org.apache.oozie.FaultInjection; -import org.apache.oozie.util.ParamChecker; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.Collection; /** * Delete Coord actions of long running coordinators, return the number of actions that were deleted. */ @@ -73,16 +72,11 @@ public class CoordActionsDeleteJPAExecutor implements JPAExecutor<Integer> { try { // Only used by test cases to check for rollback of transaction FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); - if (deleteList != null) { - for (String id : deleteList) { - ParamChecker.notNull(id, "Coordinator Action Id"); - - // Delete coordAction - Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR"); - g.setParameter("actionId", id); - g.executeUpdate(); - actionsDeleted++; - } + if (deleteList != null && !deleteList.isEmpty()) { + // Delete coordActions + Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR"); + g.setParameter("actionId", deleteList); + actionsDeleted = g.executeUpdate(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java new file mode 100644 index 0000000..fa9a6fe --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetFromCoordJobIdJPAExecutor.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import org.apache.oozie.ErrorCode; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.List; + +public class CoordActionsGetFromCoordJobIdJPAExecutor implements JPAExecutor<List<String>> { + + private String coordId; + private int limit; + private int offset; + + public CoordActionsGetFromCoordJobIdJPAExecutor(String coordId, int offset, int limit) { + this.coordId = coordId; + this.offset = offset; + this.limit = limit; + } + + @Override + public String getName() { + return "CoordActionsGetFromCoordJobIdJPAExecutor"; + } + + @Override + public List<String> execute(EntityManager em) throws JPAExecutorException { + List<String> actions = null; + try { + Query jobQ = em.createNamedQuery("GET_COORD_ACTIONS_FOR_COORDINATOR"); + jobQ.setParameter("jobId", coordId); + jobQ.setMaxResults(limit); + jobQ.setFirstResult(offset); + actions = jobQ.getResultList(); + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + return actions; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java index 2836a44..cbebef3 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsDeleteJPAExecutor.java @@ -18,13 +18,12 @@ package org.apache.oozie.executor.jpa; -import java.util.Collection; -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.ErrorCode; import org.apache.oozie.FaultInjection; -import org.apache.oozie.util.ParamChecker; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.Collection; /** * Delete Coord job, its list of actions and return the number of actions that were deleted. @@ -75,18 +74,11 @@ public class CoordJobsDeleteJPAExecutor implements JPAExecutor<Integer> { try { // Only used by test cases to check for rollback of transaction FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); - if (deleteList != null) { - for (String id : deleteList) { - ParamChecker.notNull(id, "Coordinator Job Id"); - // Delete the coord job - Query q = em.createNamedQuery("DELETE_COORD_JOB"); - q.setParameter("id", id); - q.executeUpdate(); - // Delete the actions for this coord job - Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_COORDINATOR"); - g.setParameter("jobId", id); - actionsDeleted = g.executeUpdate(); - } + if (deleteList != null && !deleteList.isEmpty()) { + // Delete the coord job list + Query q = em.createNamedQuery("DELETE_COORD_JOB"); + q.setParameter("id", deleteList); + actionsDeleted = q.executeUpdate(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java new file mode 100644 index 0000000..2e1ab5f --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.util.DateUtils; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +public class WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor implements JPAExecutor<List<WorkflowJobBean>> { + private String parentId; + private int limit; + private int offset; + + public WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(String parentId, int limit) { + this(parentId, 0, limit); + } + + public WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(String parentId, int offset, int limit) { + this.parentId = parentId; + this.offset = offset; + this.limit = limit; + } + + @Override + public String getName() { + return "WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor"; + } + + @Override + @SuppressWarnings("unchecked") + public List<WorkflowJobBean> execute(EntityManager em) throws JPAExecutorException { + try { + Query jobQ = em.createNamedQuery("GET_WORKFLOWS_BASIC_INFO_BY_COORD_PARENT_ID"); + jobQ.setParameter("parentId", parentId + "%"); // The '%' is the wildcard + jobQ.setMaxResults(limit); + jobQ.setFirstResult(offset); + return getBeanFromArray(jobQ.getResultList()); + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + } + + private List<WorkflowJobBean> getBeanFromArray(List resultList) { + List<WorkflowJobBean> wfActionBeanList = new ArrayList<WorkflowJobBean>(); + for (Object element : resultList) { + WorkflowJobBean wfBean = new WorkflowJobBean(); + Object[] arr = (Object[])element; + if(arr[0] != null) { + wfBean.setId((String) arr[0]); + } + if(arr[1] != null) { + wfBean.setStatus(WorkflowJob.Status.valueOf((String) arr[1])); + } + if(arr[2] != null) { + wfBean.setEndTime(DateUtils.toDate((Timestamp) arr[2])); + } + wfActionBeanList.add(wfBean); + } + return wfActionBeanList; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java new file mode 100644 index 0000000..8205334 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.util.DateUtils; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +public class WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor implements JPAExecutor<List<WorkflowJobBean>> { + + private String parentId; + private int limit; + private int offset; + + public WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(String parentId, int limit) { + this(parentId, 0, limit); + } + + public WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(String parentId, int offset, int limit) { + this.parentId = parentId; + this.offset = offset; + this.limit = limit; + } + + @Override + public String getName() { + return "WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor"; + } + + @Override + @SuppressWarnings("unchecked") + public List<WorkflowJobBean> execute(EntityManager em) throws JPAExecutorException { + try { + Query jobQ = em.createNamedQuery("GET_WORKFLOWS_BASIC_INFO_BY_PARENT_ID"); + jobQ.setParameter("parentId", parentId); + jobQ.setMaxResults(limit); + jobQ.setFirstResult(offset); + return getBeanFromArray(jobQ.getResultList()); + } + catch (Exception e) { + throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); + } + } + + private List<WorkflowJobBean> getBeanFromArray(List resultList) { + List<WorkflowJobBean> wfActionBeanList = new ArrayList<WorkflowJobBean>(); + for (Object element : resultList) { + WorkflowJobBean wfBean = new WorkflowJobBean(); + Object[] arr = (Object[])element; + if(arr[0] != null) { + wfBean.setId((String) arr[0]); + } + if(arr[1] != null) { + wfBean.setStatus(WorkflowJob.Status.valueOf((String) arr[1])); + } + if(arr[2] != null) { + wfBean.setEndTime(DateUtils.toDate((Timestamp) arr[2])); + } + wfActionBeanList.add(wfBean); + } + + return wfActionBeanList; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java deleted file mode 100644 index 4eca836..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; - -/** - * Count the number of Workflow children of a parent Coordinator that are not ready to be purged - */ -public class WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor implements JPAExecutor<Long> { - - private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; - private long olderThanDays; - private String parentId; - - public WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(long olderThanDays, String parentId) { - this.olderThanDays = olderThanDays; - this.parentId = parentId; - } - - @Override - public String getName() { - return "WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor"; - } - - @Override - @SuppressWarnings("unchecked") - public Long execute(EntityManager em) throws JPAExecutorException { - Long count = 0L; - try { - Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); - Query jobQ = em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE"); - jobQ.setParameter("parentId", parentId + "%"); // The '%' is the wildcard - jobQ.setParameter("endTime", maxEndTime); - count = (Long) jobQ.getSingleResult(); - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - return count; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java deleted file mode 100644 index 42356d2..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.ErrorCode; - -/** - * Count the number of Workflow children of a parent Workflow that are not ready to be purged - */ -public class WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor implements JPAExecutor<Long> { - - private static final long DAY_IN_MS = 24 * 60 * 60 * 1000; - private long olderThanDays; - private String parentId; - - public WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(long olderThanDays, String parentId) { - this.olderThanDays = olderThanDays; - this.parentId = parentId; - } - - @Override - public String getName() { - return "WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor"; - } - - @Override - @SuppressWarnings("unchecked") - public Long execute(EntityManager em) throws JPAExecutorException { - Long count = 0L; - try { - Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS)); - Query jobQ = em.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE"); - jobQ.setParameter("parentId", parentId); - jobQ.setParameter("endTime", maxEndTime); - count = (Long) jobQ.getSingleResult(); - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - return count; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java index f18e073..8674aa2 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobsDeleteJPAExecutor.java @@ -18,13 +18,12 @@ package org.apache.oozie.executor.jpa; -import java.util.Collection; -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.ErrorCode; import org.apache.oozie.FaultInjection; -import org.apache.oozie.util.ParamChecker; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.Collection; /** * Delete WF job, its list of actions and return the number of actions that were deleted. @@ -75,18 +74,15 @@ public class WorkflowJobsDeleteJPAExecutor implements JPAExecutor<Integer> { try { // Only used by test cases to check for rollback of transaction FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); - if (deleteList != null) { - for (String id : deleteList) { - ParamChecker.notNull(id, "Workflow Job Id"); - // Delete the WF job - Query q = em.createNamedQuery("DELETE_WORKFLOW"); - q.setParameter("id", id); - q.executeUpdate(); - // Delete the actions for this WF job - Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW"); - g.setParameter("wfId", id); - actionsDeleted = g.executeUpdate(); - } + if (deleteList != null && !deleteList.isEmpty()) { + // Delete the WF job + Query q = em.createNamedQuery("DELETE_WORKFLOW"); + q.setParameter("id", deleteList); + q.executeUpdate(); + // Delete the actions for this WF job + Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW"); + g.setParameter("wfId", deleteList); + actionsDeleted = g.executeUpdate(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java index 6e07d03..bff5836 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsDeleteJPAExecutor.java @@ -18,11 +18,6 @@ package org.apache.oozie.executor.jpa; -import java.util.ArrayList; -import java.util.List; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.fail; - import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; @@ -34,6 +29,9 @@ import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; +import java.util.ArrayList; +import java.util.List; + public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase { Services services; private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", @@ -76,11 +74,20 @@ public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase { JPAService jpaService = Services.get().get(JPAService.class); assertNotNull(jpaService); - List<String> deleteList = new ArrayList<String>(); - deleteList.add(jobA.getId()); - deleteList.add(jobB.getId()); - deleteList.add(jobC.getId()); - jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList)); + List<String> deleteCoordlist = new ArrayList<String>(); + deleteCoordlist.add(jobA.getId()); + deleteCoordlist.add(jobB.getId()); + deleteCoordlist.add(jobC.getId()); + jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteCoordlist)); + + List<String> deleteActionList = new ArrayList<String>(); + deleteActionList.add(actionA1.getId()); + deleteActionList.add(actionB1.getId()); + deleteActionList.add(actionC1.getId()); + deleteActionList.add(actionA2.getId()); + deleteActionList.add(actionB2.getId()); + deleteActionList.add(actionC2.getId()); + jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteActionList)); try { jpaService.execute(new CoordJobGetJPAExecutor(jobA.getId())); @@ -182,13 +189,22 @@ public class TestCoordJobsDeleteJPAExecutor extends XDataTestCase { setSystemProperty(FaultInjection.FAULT_INJECTION, "true"); setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true"); - List<String> deleteList = new ArrayList<String>(); - deleteList.add(jobA.getId()); - deleteList.add(jobB.getId()); - deleteList.add(jobC.getId()); + List<String> deleteCoordlist = new ArrayList<String>(); + deleteCoordlist.add(jobA.getId()); + deleteCoordlist.add(jobB.getId()); + deleteCoordlist.add(jobC.getId()); + + List<String> deleteActionList = new ArrayList<String>(); + deleteActionList.add(actionA1.getId()); + deleteActionList.add(actionB1.getId()); + deleteActionList.add(actionC1.getId()); + deleteActionList.add(actionA2.getId()); + deleteActionList.add(actionB2.getId()); + deleteActionList.add(actionC2.getId()); try { - jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteList)); + jpaService.execute(new CoordJobsDeleteJPAExecutor(deleteCoordlist)); + jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteActionList)); fail("Should have skipped commit for failover testing"); } catch (RuntimeException re) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java new file mode 100644 index 0000000..e2f8b9a --- /dev/null +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.workflow.WorkflowInstance; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + + +public class TestWorkflowJobsBasicInfoFromCoordParentIdJPAExecutor extends XDataTestCase { + Services services; + private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", + "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", + "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + setClassesToBeExcluded(services.getConf(), excludedServices); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testGetCoordinatorParent() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + HashMap<String,WorkflowJobBean> wflist = new HashMap<String, WorkflowJobBean>(); + CoordinatorJobBean coordJobA = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + CoordinatorJobBean coordJobB = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + WorkflowJobBean wfJobA1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobA2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + + List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>(); + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJobA.getId(), 10))); + wflist.put(wfJobA1.getId(), wfJobA1); + wflist.put(wfJobA2.getId(), wfJobA2); + checkChildren(children, wflist); + + children = new ArrayList<WorkflowJobBean>(); + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJobB.getId(), 10))); + wflist.clear(); + wflist.put(wfJobB.getId(), wfJobB); + checkChildren(children, wflist); + } + + public void testGetWorkflowParentTooMany() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + HashMap<String,WorkflowJobBean> wflist = new HashMap<String, WorkflowJobBean>(); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + coordJob.getId()); + + List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>(); + // Get the first 3 + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJob.getId(), 3))); + assertEquals(3, children.size()); + // Get the next 3 (though there's only 2 more) + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordJob.getId(), 3, 3))); + assertEquals(5, children.size()); + wflist.put(wfJob1.getId(), wfJob1); + wflist.put(wfJob2.getId(), wfJob2); + wflist.put(wfJob3.getId(), wfJob3); + wflist.put(wfJob4.getId(), wfJob4); + wflist.put(wfJob5.getId(), wfJob5); + checkChildren(children, wflist); + } + + private void checkChildren(List<WorkflowJobBean> children, HashMap<String,WorkflowJobBean> wfJobBaselist) { + assertEquals(wfJobBaselist.size(), children.size()); + for (int i = 0; i < children.size(); i++) { + WorkflowJobBean wfJobBase = wfJobBaselist.get(children.get(i).getId()); + assertNotNull(wfJobBase); + assertEquals(wfJobBase.getStatus(),children.get(i).getStatus()); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java new file mode 100644 index 0000000..e275520 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.workflow.WorkflowInstance; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class TestWorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor extends XDataTestCase { + Services services; + private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", + "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", + "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + setClassesToBeExcluded(services.getConf(), excludedServices); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testGetWorkflowParent() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + + HashMap<String,WorkflowJobBean> wflist = new HashMap<String, WorkflowJobBean>(); + + WorkflowJobBean wfJobA = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean wfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean subwfJobA1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobA.getId()); + WorkflowJobBean subwfJobA2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobA.getId()); + WorkflowJobBean subwfJobB = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJobB.getId()); + + List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>(); + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJobA.getId(), 10))); + wflist.put(subwfJobA1.getId(), subwfJobA1); + wflist.put(subwfJobA2.getId(),subwfJobA2); + checkChildren(children, wflist); + + children = new ArrayList<WorkflowJobBean>(); + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJobB.getId(), 10))); + wflist.clear(); + wflist.put(subwfJobB.getId(),subwfJobB); + checkChildren(children, wflist); + } + + public void testGetCoordinatorParentTooMany() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + HashMap<String,WorkflowJobBean> wflist = new HashMap<String, WorkflowJobBean>(); + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob3 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob4 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + WorkflowJobBean subwfJob5 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, + wfJob.getId()); + + List<WorkflowJobBean> children = new ArrayList<WorkflowJobBean>(); + // Get the first 3 + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3))); + assertEquals(3, children.size()); + // Get the next 3 (though there's only 2 more) + children.addAll(jpaService.execute(new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfJob.getId(), 3, 3))); + assertEquals(5, children.size()); + wflist.put(subwfJob1.getId(), subwfJob1); + wflist.put(subwfJob2.getId(), subwfJob2); + wflist.put(subwfJob3.getId(), subwfJob3); + wflist.put(subwfJob4.getId(), subwfJob4); + wflist.put(subwfJob5.getId(), subwfJob5); + + checkChildren(children, wflist); + } + + private void checkChildren(List<WorkflowJobBean> children, HashMap<String,WorkflowJobBean> wfJobBaselist) { + assertEquals(wfJobBaselist.size(), children.size()); + for (int i = 0; i < children.size(); i++) { + WorkflowJobBean wfJobBase = wfJobBaselist.get(children.get(i).getId()); + assertNotNull(wfJobBase); + assertEquals(wfJobBase.getStatus(),children.get(i).getStatus()); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java deleted file mode 100644 index 949c2ae..0000000 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.command.TestPurgeXCommand; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.workflow.WorkflowInstance; - -public class TestWorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor extends XDataTestCase { - Services services; - private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", - "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", - "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - setClassesToBeExcluded(services.getConf(), excludedServices); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - public void testCount() throws Exception { - JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); - - CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); - String coordJobId = coordJob.getId(); - int days = 1; - assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, coordJobId); - wfJob1 = TestPurgeXCommand.setEndTime(wfJob1, "2009-12-01T01:00Z"); - CoordinatorActionBean coordAction1 = addRecordToCoordActionTable(coordJobId, 1, CoordinatorAction.Status.SUCCEEDED, - "coord-action-get.xml", wfJob1.getId(), wfJob1.getStatusStr(), 0); - days = 1; - assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime()); - assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, coordJobId); - wfJob2 = TestPurgeXCommand.setEndTime(wfJob2, "2009-11-01T01:00Z"); - CoordinatorActionBean coordAction2 = addRecordToCoordActionTable(coordJobId, 2, CoordinatorAction.Status.FAILED, - "coord-action-get.xml", wfJob2.getId(), wfJob2.getStatusStr(), 0); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob1.getEndTime()); - assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime()); - assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED, coordJobId); - wfJob3 = TestPurgeXCommand.setEndTime(wfJob3, "2009-10-01T01:00Z"); - CoordinatorActionBean coordAction3 = addRecordToCoordActionTable(coordJobId, 3, CoordinatorAction.Status.KILLED, - "coord-action-get.xml", wfJob3.getId(), wfJob3.getStatusStr(), 0); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob2.getEndTime()); - assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime()); - assertEquals(3, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob4 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, coordJobId); - wfJob4 = TestPurgeXCommand.setEndTime(wfJob4, "2009-09-01T01:00Z"); - CoordinatorActionBean coordAction4 = addRecordToCoordActionTable(coordJobId, 4, CoordinatorAction.Status.RUNNING, - "coord-action-get.xml", wfJob4.getId(), wfJob4.getStatusStr(), 0); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob3.getEndTime()); - assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime()); - assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob5 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, coordJobId); - wfJob5 = TestPurgeXCommand.setEndTime(wfJob5, "2009-08-01T01:00Z"); - CoordinatorActionBean coordAction5 = addRecordToCoordActionTable(coordJobId, 5, CoordinatorAction.Status.RUNNING, - "coord-action-get.xml", wfJob5.getId(), wfJob5.getStatusStr(), 0); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob4.getEndTime()); - assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime()); - assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - - WorkflowJobBean wfJob6 = addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED, coordJobId); - wfJob6 = TestPurgeXCommand.setEndTime(wfJob6, "2009-07-01T01:00Z"); - CoordinatorActionBean coordAction6 = addRecordToCoordActionTable(coordJobId, 6, CoordinatorAction.Status.SUSPENDED, - "coord-action-get.xml", wfJob6.getId(), wfJob6.getStatusStr(), 0); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob5.getEndTime()); - assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(wfJob6.getEndTime()); - assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(days, coordJobId))); - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0d7b94af/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java deleted file mode 100644 index 2004503..0000000 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.command.TestPurgeXCommand; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.Services; -import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.workflow.WorkflowInstance; - -public class TestWorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor extends XDataTestCase { - Services services; - private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", - "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService", - "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; - - @Override - protected void setUp() throws Exception { - super.setUp(); - services = new Services(); - setClassesToBeExcluded(services.getConf(), excludedServices); - services.init(); - } - - @Override - protected void tearDown() throws Exception { - services.destroy(); - super.tearDown(); - } - - public void testCount() throws Exception { - JPAService jpaService = Services.get().get(JPAService.class); - assertNotNull(jpaService); - - WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - String wfJobId = wfJob.getId(); - int days = 1; - assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED, wfJobId); - subwfJob1 = TestPurgeXCommand.setEndTime(subwfJob1, "2009-12-01T01:00Z"); - days = 1; - assertEquals(0, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime()); - assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob2 = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED, wfJobId); - subwfJob2 = TestPurgeXCommand.setEndTime(subwfJob2, "2009-11-01T01:00Z"); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob1.getEndTime()); - assertEquals(1, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime()); - assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob3 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED, wfJobId); - subwfJob3 = TestPurgeXCommand.setEndTime(subwfJob3, "2009-10-01T01:00Z"); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob2.getEndTime()); - assertEquals(2, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime()); - assertEquals(3, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob4 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP, wfJobId); - subwfJob4 = TestPurgeXCommand.setEndTime(subwfJob4, "2009-09-01T01:00Z"); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob3.getEndTime()); - assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime()); - assertEquals(4, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob5 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, wfJobId); - subwfJob5 = TestPurgeXCommand.setEndTime(subwfJob5, "2009-08-01T01:00Z"); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob4.getEndTime()); - assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime()); - assertEquals(5, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - - WorkflowJobBean subwfJob6 = addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED, wfJobId); - subwfJob6 = TestPurgeXCommand.setEndTime(subwfJob6, "2009-07-01T01:00Z"); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob5.getEndTime()); - assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - days = TestPurgeXCommand.getNumDaysToNotBePurged(subwfJob6.getEndTime()); - assertEquals(6, (long) jpaService.execute(new WorkflowJobsCountNotForPurgeFromWorkflowParentIdJPAExecutor(days, wfJobId))); - } -}
