Updated Branches:
  refs/heads/master 6f65a5bbe -> 2ecf9e329

CLOUDSTACK-2680: Async job expunge thread - expunge only:

1) Unfinished jobs that are yet to be processed.
2) Completed jobs

The jobs that are in process, will be skipped by the expunge thread

Conflicts:
        server/src/com/cloud/async/dao/AsyncJobDao.java
        server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
        server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2ecf9e32
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2ecf9e32
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2ecf9e32

Branch: refs/heads/master
Commit: 2ecf9e3293d9b5f1ccffe0736bc8ef0cbf3b1529
Parents: 6f65a5b
Author: Alena Prokharchyk <[email protected]>
Authored: Fri May 24 14:08:28 2013 -0700
Committer: Alena Prokharchyk <[email protected]>
Committed: Fri May 24 15:01:12 2013 -0700

----------------------------------------------------------------------
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   17 ++-
 server/src/com/cloud/async/dao/AsyncJobDao.java    |    5 +-
 .../src/com/cloud/async/dao/AsyncJobDaoImpl.java   |   88 +++++++++------
 3 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java 
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 47d793f..0101a8a 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
 
                     // limit to 100 jobs per turn, this gives cleanup 
throughput as 600 jobs per minute
                     // hopefully this will be fast enough to balance potential 
growth of job table
-                    List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
-                    if(l != null && l.size() > 0) {
-                        for(AsyncJobVO job : l) {
-                            expungeAsyncJob(job);
-                        }
+                    //1) Expire unfinished jobs that weren't processed yet
+                    List<AsyncJobVO> l = 
_jobDao.getExpiredUnfinishedJobs(cutTime, 100);
+                    for(AsyncJobVO job : l) {
+                       s_logger.trace("Expunging unfinished job " + job);
+                        expungeAsyncJob(job);
+                    }       
+                    
+                    //2) Expunge finished jobs
+                    List<AsyncJobVO> completedJobs = 
_jobDao.getExpiredCompletedJobs(cutTime, 100);
+                    for(AsyncJobVO job : completedJobs) {
+                       s_logger.trace("Expunging completed job " + job);
+                        expungeAsyncJob(job);
                     }
 
                     // forcefully cancel blocking queue items if they've been 
staying there for too long

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDao.java 
b/server/src/com/cloud/async/dao/AsyncJobDao.java
index 9d20759..9ab9b22 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDao.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDao.java
@@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao;
 public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
        AsyncJobVO findInstancePendingAsyncJob(String instanceType, long 
instanceId);
        List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type 
instanceType, Long accountId);
-       List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
+       List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit);
        void resetJobProcess(long msid, int jobResultCode, String 
jobResultMessage);
-}
+       List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2ecf9e32/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java 
b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
index 4793a6e..b2c0d9c 100644
--- a/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
+++ b/server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
@@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends 
GenericDaoBase<AsyncJobVO, Long> implements
     private static final Logger s_logger = 
Logger.getLogger(AsyncJobDaoImpl.class.getName());
        
        private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;  
-       private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch; 
-       private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;         
-       
-       public AsyncJobDaoImpl() {
-               pendingAsyncJobSearch = createSearchBuilder();
-               pendingAsyncJobSearch.and("instanceType", 
pendingAsyncJobSearch.entity().getInstanceType(), 
-                       SearchCriteria.Op.EQ);
-               pendingAsyncJobSearch.and("instanceId", 
pendingAsyncJobSearch.entity().getInstanceId(), 
-                       SearchCriteria.Op.EQ);
-               pendingAsyncJobSearch.and("status", 
pendingAsyncJobSearch.entity().getStatus(), 
-                               SearchCriteria.Op.EQ);
+       private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch; 
+       private final SearchBuilder<AsyncJobVO> 
expiringUnfinishedAsyncJobSearch;
+       private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
+
+       
+       public AsyncJobDaoImpl() {
+               pendingAsyncJobSearch = createSearchBuilder();
+               pendingAsyncJobSearch.and("instanceType", 
pendingAsyncJobSearch.entity().getInstanceType(), 
+                       SearchCriteria.Op.EQ);
+               pendingAsyncJobSearch.and("instanceId", 
pendingAsyncJobSearch.entity().getInstanceId(), 
+                       SearchCriteria.Op.EQ);
+               pendingAsyncJobSearch.and("status", 
pendingAsyncJobSearch.entity().getStatus(), 
+                               SearchCriteria.Op.EQ);
                pendingAsyncJobSearch.done();
                
                pendingAsyncJobsSearch = createSearchBuilder();
@@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends 
GenericDaoBase<AsyncJobVO, Long> implements
                                SearchCriteria.Op.EQ);
                pendingAsyncJobsSearch.done();
                
-               expiringAsyncJobSearch = createSearchBuilder();
-               expiringAsyncJobSearch.and("created", 
expiringAsyncJobSearch.entity().getCreated(), 
+               expiringUnfinishedAsyncJobSearch = createSearchBuilder();
+               expiringUnfinishedAsyncJobSearch.and("created", 
expiringUnfinishedAsyncJobSearch.entity().getCreated(),
                        SearchCriteria.Op.LTEQ);
-               expiringAsyncJobSearch.done();
-       }
-       
-       public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long 
instanceId) {
-        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
-        sc.setParameters("instanceType", instanceType);
-        sc.setParameters("instanceId", instanceId);
-        sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
-        
-        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
-        if(l != null && l.size() > 0) {
-               if(l.size() > 1) {
-                       s_logger.warn("Instance " + instanceType + "-" + 
instanceId + " has multiple pending async-job");
-               }
-               
-               return l.get(0);
-        }
-        return null;
+               expiringUnfinishedAsyncJobSearch.and("completeMsId", 
expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), 
SearchCriteria.Op.NULL);
+               expiringUnfinishedAsyncJobSearch.and("jobStatus", 
expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
+               expiringUnfinishedAsyncJobSearch.done();
+               
+               expiringCompletedAsyncJobSearch = createSearchBuilder();
+               expiringCompletedAsyncJobSearch.and("created", 
expiringCompletedAsyncJobSearch.entity().getCreated(),
+                       SearchCriteria.Op.LTEQ);
+               expiringCompletedAsyncJobSearch.and("completeMsId", 
expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), 
SearchCriteria.Op.NNULL);
+               expiringCompletedAsyncJobSearch.and("jobStatus", 
expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
+               expiringCompletedAsyncJobSearch.done();
+       }
+       
+       public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long 
instanceId) {
+        SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+        sc.setParameters("instanceType", instanceType);
+        sc.setParameters("instanceId", instanceId);
+        sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
+        
+        List<AsyncJobVO> l = listIncludingRemovedBy(sc);
+        if(l != null && l.size() > 0) {
+               if(l.size() > 1) {
+                       s_logger.warn("Instance " + instanceType + "-" + 
instanceId + " has multiple pending async-job");
+               }
+               
+               return l.get(0);
+        }
+        return null;
        }
        
        public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type 
instanceType, Long accountId) {
@@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends 
GenericDaoBase<AsyncJobVO, Long> implements
         return listBy(sc);
        }
        
-       public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
-               SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
+       @Override
+       public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int 
limit) {
+               SearchCriteria<AsyncJobVO> sc = 
expiringUnfinishedAsyncJobSearch.create();
+               sc.setParameters("created", cutTime);
+               sc.setParameters("jobStatus", 0);
+               Filter filter = new Filter(AsyncJobVO.class, "created", true, 
0L, (long)limit);
+               return listIncludingRemovedBy(sc, filter);
+       }
+       
+       @Override
+       public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int 
limit) {
+               SearchCriteria<AsyncJobVO> sc = 
expiringCompletedAsyncJobSearch.create();
                sc.setParameters("created", cutTime);
+               sc.setParameters("jobStatus", 0);
                Filter filter = new Filter(AsyncJobVO.class, "created", true, 
0L, (long)limit);
                return listIncludingRemovedBy(sc, filter);
        }

Reply via email to