Updated Branches:
refs/heads/master 6f65a5bbe -> 2ecf9e329
CLOUDSTACK-2680: Async job expunge thread - expunge only:
1) Unfinished jobs that are yet to be processed.
2) Completed jobs
The jobs that are in process, will be skipped by the expunge thread
Conflicts:
server/src/com/cloud/async/dao/AsyncJobDao.java
server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2ecf9e32
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2ecf9e32
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2ecf9e32
Branch: refs/heads/master
Commit: 2ecf9e3293d9b5f1ccffe0736bc8ef0cbf3b1529
Parents: 6f65a5b
Author: Alena Prokharchyk <[email protected]>
Authored: Fri May 24 14:08:28 2013 -0700
Committer: Alena Prokharchyk <[email protected]>
Committed: Fri May 24 15:01:12 2013 -0700
----------------------------------------------------------------------
.../src/com/cloud/async/AsyncJobManagerImpl.java | 17 ++-
server/src/com/cloud/async/dao/AsyncJobDao.java | 5 +-
.../src/com/cloud/async/dao/AsyncJobDaoImpl.java | 88 +++++++++------
3 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 47d793f..0101a8a 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase
implements AsyncJobManager,
// limit to 100 jobs per turn, this gives cleanup
throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential
growth of job table
- List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
- if(l != null && l.size() > 0) {
- for(AsyncJobVO job : l) {
- expungeAsyncJob(job);
- }
+ //1) Expire unfinished jobs that weren't processed yet
+ List<AsyncJobVO> l =
_jobDao.getExpiredUnfinishedJobs(cutTime, 100);
+ for(AsyncJobVO job : l) {
+ s_logger.trace("Expunging unfinished job " + job);
+ expungeAsyncJob(job);
+ }
+
+ //2) Expunge finished jobs
+ List<AsyncJobVO> completedJobs =
_jobDao.getExpiredCompletedJobs(cutTime, 100);
+ for(AsyncJobVO job : completedJobs) {
+ s_logger.trace("Expunging completed job " + job);
+ expungeAsyncJob(job);
}
// forcefully cancel blocking queue items if they've been
staying there for too long
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java
b/server/src/com/cloud/async/dao/AsyncJobDao.java
index 9d20759..9ab9b22 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDao.java
@@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao;
public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
AsyncJobVO findInstancePendingAsyncJob(String instanceType, long
instanceId);
List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type
instanceType, Long accountId);
- List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
+ List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit);
void resetJobProcess(long msid, int jobResultCode, String
jobResultMessage);
-}
+ List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
index 4793a6e..b2c0d9c 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
@@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends
GenericDaoBase<AsyncJobVO, Long> implements
private static final Logger s_logger =
Logger.getLogger(AsyncJobDaoImpl.class.getName());
private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
- private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
- private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;
-
- public AsyncJobDaoImpl() {
- pendingAsyncJobSearch = createSearchBuilder();
- pendingAsyncJobSearch.and("instanceType",
pendingAsyncJobSearch.entity().getInstanceType(),
- SearchCriteria.Op.EQ);
- pendingAsyncJobSearch.and("instanceId",
pendingAsyncJobSearch.entity().getInstanceId(),
- SearchCriteria.Op.EQ);
- pendingAsyncJobSearch.and("status",
pendingAsyncJobSearch.entity().getStatus(),
- SearchCriteria.Op.EQ);
+ private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
+ private final SearchBuilder<AsyncJobVO>
expiringUnfinishedAsyncJobSearch;
+ private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
+
+
+ public AsyncJobDaoImpl() {
+ pendingAsyncJobSearch = createSearchBuilder();
+ pendingAsyncJobSearch.and("instanceType",
pendingAsyncJobSearch.entity().getInstanceType(),
+ SearchCriteria.Op.EQ);
+ pendingAsyncJobSearch.and("instanceId",
pendingAsyncJobSearch.entity().getInstanceId(),
+ SearchCriteria.Op.EQ);
+ pendingAsyncJobSearch.and("status",
pendingAsyncJobSearch.entity().getStatus(),
+ SearchCriteria.Op.EQ);
pendingAsyncJobSearch.done();
pendingAsyncJobsSearch = createSearchBuilder();
@@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends
GenericDaoBase<AsyncJobVO, Long> implements
SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.done();
- expiringAsyncJobSearch = createSearchBuilder();
- expiringAsyncJobSearch.and("created",
expiringAsyncJobSearch.entity().getCreated(),
+ expiringUnfinishedAsyncJobSearch = createSearchBuilder();
+ expiringUnfinishedAsyncJobSearch.and("created",
expiringUnfinishedAsyncJobSearch.entity().getCreated(),
SearchCriteria.Op.LTEQ);
- expiringAsyncJobSearch.done();
- }
-
- public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long
instanceId) {
- SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
- sc.setParameters("instanceType", instanceType);
- sc.setParameters("instanceId", instanceId);
- sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
-
- List<AsyncJobVO> l = listIncludingRemovedBy(sc);
- if(l != null && l.size() > 0) {
- if(l.size() > 1) {
- s_logger.warn("Instance " + instanceType + "-" +
instanceId + " has multiple pending async-job");
- }
-
- return l.get(0);
- }
- return null;
+ expiringUnfinishedAsyncJobSearch.and("completeMsId",
expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(),
SearchCriteria.Op.NULL);
+ expiringUnfinishedAsyncJobSearch.and("jobStatus",
expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
+ expiringUnfinishedAsyncJobSearch.done();
+
+ expiringCompletedAsyncJobSearch = createSearchBuilder();
+ expiringCompletedAsyncJobSearch.and("created",
expiringCompletedAsyncJobSearch.entity().getCreated(),
+ SearchCriteria.Op.LTEQ);
+ expiringCompletedAsyncJobSearch.and("completeMsId",
expiringCompletedAsyncJobSearch.entity().getCompleteMsid(),
SearchCriteria.Op.NNULL);
+ expiringCompletedAsyncJobSearch.and("jobStatus",
expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
+ expiringCompletedAsyncJobSearch.done();
+ }
+
+ public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long
instanceId) {
+ SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+ sc.setParameters("instanceType", instanceType);
+ sc.setParameters("instanceId", instanceId);
+ sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
+
+ List<AsyncJobVO> l = listIncludingRemovedBy(sc);
+ if(l != null && l.size() > 0) {
+ if(l.size() > 1) {
+ s_logger.warn("Instance " + instanceType + "-" +
instanceId + " has multiple pending async-job");
+ }
+
+ return l.get(0);
+ }
+ return null;
}
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type
instanceType, Long accountId) {
@@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends
GenericDaoBase<AsyncJobVO, Long> implements
return listBy(sc);
}
- public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
- SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
+ @Override
+ public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int
limit) {
+ SearchCriteria<AsyncJobVO> sc =
expiringUnfinishedAsyncJobSearch.create();
+ sc.setParameters("created", cutTime);
+ sc.setParameters("jobStatus", 0);
+ Filter filter = new Filter(AsyncJobVO.class, "created", true,
0L, (long)limit);
+ return listIncludingRemovedBy(sc, filter);
+ }
+
+ @Override
+ public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int
limit) {
+ SearchCriteria<AsyncJobVO> sc =
expiringCompletedAsyncJobSearch.create();
sc.setParameters("created", cutTime);
+ sc.setParameters("jobStatus", 0);
Filter filter = new Filter(AsyncJobVO.class, "created", true,
0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}