Updated Branches:
  refs/heads/vmsync 0233044b2 -> 422d9b8da

Fixed the build error


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a407b33a
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a407b33a
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a407b33a

Branch: refs/heads/vmsync
Commit: a407b33a77e3e20037214a88dc6b0564671a6f44
Parents: 0233044
Author: Alex Huang <[email protected]>
Authored: Thu Jun 27 15:23:42 2013 -0700
Committer: Alex Huang <[email protected]>
Committed: Thu Jun 27 15:23:42 2013 -0700

----------------------------------------------------------------------
 .../framework/jobs/dao/AsyncJobJoinMapDao.java  |   5 +-
 .../jobs/dao/AsyncJobJoinMapDaoImpl.java        | 144 +++++++++++--------
 .../jobs/impl/AsyncJobManagerImpl.java          |  76 ++++------
 3 files changed, 119 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
index 4458fa2..577ed10 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.cloudstack.framework.jobs.dao;
 
+import java.util.Date;
 import java.util.List;
 
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
@@ -36,8 +37,10 @@ public interface AsyncJobJoinMapDao extends 
GenericDao<AsyncJobJoinMapVO, Long>
        
     void completeJoin(long joinJobId, JobInfo.Status joinStatus, String 
joinResult, long completeMsid);
        
-       List<Long> wakeupScan();
+//     List<Long> wakeupScan();
 
     List<Long> findJobsToWake(long joinedJobId);
+
+    List<Long> findJobsToWakeBetween(Date cutDate);
 //     List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index fa3b14b..20d8ba6 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -26,7 +26,6 @@ import java.util.TimeZone;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
 import org.apache.cloudstack.jobs.JobInfo;
 
@@ -152,66 +151,67 @@ public class AsyncJobJoinMapDaoImpl extends 
GenericDaoBase<AsyncJobJoinMapVO, Lo
         update(ub, sc, null);
        }
 
-       @Override
-    public List<Long> wakeupScan() {
-               List<Long> standaloneList = new ArrayList<Long>();
-               
-               Date cutDate = DateUtil.currentGMTTime();
-               
-               Transaction txn = Transaction.currentTxn();
-        PreparedStatement pstmt = null;
-        try {
-                       txn.start();
-                       
-                       //
-                       // performance sensitive processing, do it in plain SQL
-                       //
-                       String sql = "UPDATE async_job SET 
job_pending_signals=? WHERE id IN " +
-                                       "(SELECT job_id FROM async_job_join_map 
WHERE next_wakeup < ? AND expiration > ?)";
-                       pstmt = txn.prepareStatement(sql);
-                       pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.setString(3, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.executeUpdate();
-               pstmt.close();
-                       
-                       sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, 
queue_proc_number=NULL WHERE content_id IN " +
-                                       "(SELECT job_id FROM async_job_join_map 
WHERE next_wakeup < ? AND expiration > ?)";
-                       pstmt = txn.prepareStatement(sql);
-               pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.executeUpdate();
-               pstmt.close();
-               
-               sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM 
sync_queue_item)";
-                       pstmt = txn.prepareStatement(sql);
-               pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               ResultSet rs = pstmt.executeQuery();
-               while(rs.next()) {
-                       standaloneList.add(rs.getLong(1));
-               }
-               rs.close();
-               pstmt.close();
-                       
-               // update for next wake-up
-               sql = "UPDATE async_job_join_map SET 
next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE 
next_wakeup < ? AND expiration > ?";
-                       pstmt = txn.prepareStatement(sql);
-               pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-               pstmt.executeUpdate();
-               pstmt.close();
-               
-               txn.commit();
-               } catch (SQLException e) {
-                       s_logger.error("Unexpected exception", e);
-               }
-        
-        return standaloneList;
-       }
+//     @Override
+//    public List<Long> wakeupScan() {
+//             List<Long> standaloneList = new ArrayList<Long>();
+//
+//             Date cutDate = DateUtil.currentGMTTime();
+//
+//             Transaction txn = Transaction.currentTxn();
+//        PreparedStatement pstmt = null;
+//        try {
+//                     txn.start();
+//
+//                     //
+//                     // performance sensitive processing, do it in plain SQL
+//                     //
+//                     String sql = "UPDATE async_job SET 
job_pending_signals=? WHERE id IN " +
+//                                     "(SELECT job_id FROM async_job_join_map 
WHERE next_wakeup < ? AND expiration > ?)";
+//                     pstmt = txn.prepareStatement(sql);
+//                     pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+//             pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.setString(3, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.executeUpdate();
+//             pstmt.close();
+//
+//                     sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, 
queue_proc_number=NULL WHERE content_id IN " +
+//                                     "(SELECT job_id FROM async_job_join_map 
WHERE next_wakeup < ? AND expiration > ?)";
+//                     pstmt = txn.prepareStatement(sql);
+//             pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.executeUpdate();
+//             pstmt.close();
+//
+//             sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM 
sync_queue_item)";
+//                     pstmt = txn.prepareStatement(sql);
+//             pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             ResultSet rs = pstmt.executeQuery();
+//             while(rs.next()) {
+//                     standaloneList.add(rs.getLong(1));
+//             }
+//             rs.close();
+//             pstmt.close();
+//
+//             // update for next wake-up
+//             sql = "UPDATE async_job_join_map SET 
next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE 
next_wakeup < ? AND expiration > ?";
+//                     pstmt = txn.prepareStatement(sql);
+//             pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+//             pstmt.executeUpdate();
+//             pstmt.close();
+//
+//             txn.commit();
+//             } catch (SQLException e) {
+//                     s_logger.error("Unexpected exception", e);
+//             }
+//
+//        return standaloneList;
+//     }
 
     @Override
     public List<Long> findJobsToWake(long joinedJobId) {
+        // TODO: We should fix this.  We shouldn't be crossing daos in a dao 
code.
         List<Long> standaloneList = new ArrayList<Long>();
         Transaction txn = Transaction.currentTxn();
         String sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id 
= ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
@@ -227,6 +227,34 @@ public class AsyncJobJoinMapDaoImpl extends 
GenericDaoBase<AsyncJobJoinMapVO, Lo
         }
         return standaloneList;
     }
+
+    @Override
+    public List<Long> findJobsToWakeBetween(Date cutDate) {
+        List<Long> standaloneList = new ArrayList<Long>();
+        Transaction txn = Transaction.currentTxn();
+        try {
+            String sql = "SELECT job_id FROM async_job_join_map WHERE 
next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM 
sync_queue_item)";
+            PreparedStatement pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next()) {
+                standaloneList.add(rs.getLong(1));
+            }
+
+            // update for next wake-up
+            sql = "UPDATE async_job_join_map SET 
next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE 
next_wakeup < ? AND expiration > ?";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+
+            return standaloneList;
+        } catch (SQLException e) {
+            throw new CloudRuntimeException("Unable to handle SQL exception", 
e);
+        }
+
+    }
        
 //    @Override
 //    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 9b6aa97..5c3908d 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -20,8 +20,6 @@ package org.apache.cloudstack.framework.jobs.impl;
 import java.io.File;
 import java.io.FileInputStream;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -819,7 +817,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", 
joinedJobId);
 
         List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
-        if (result.size() != 0) {
+        if (result.size() > 0) {
             Collections.sort(result);
             Long[] ids = result.toArray(new Long[result.size()]);
 
@@ -849,55 +847,36 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
 
         Transaction txn = Transaction.currentTxn();
         PreparedStatement pstmt = null;
-        try {
-            txn.start();
 
+        SearchCriteria<Long> sc = JoinJobTimeSearch.create();
+        sc.setParameters("beginTime", 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+        sc.setParameters("endTime", 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
 
-            //
-            // performance sensitive processing, do it in plain SQL
-            //
-            String sql = "UPDATE async_job SET job_pending_signals=? WHERE id 
IN " +
-                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ?)";
-            pstmt = txn.prepareStatement(sql);
-            pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
-            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.setString(3, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.executeUpdate();
-            pstmt.close();
-
-            sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, 
queue_proc_number=NULL WHERE content_id IN " +
-                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup 
< ? AND expiration > ?)";
-            pstmt = txn.prepareStatement(sql);
-            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.executeUpdate();
-            pstmt.close();
-
-            sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? 
AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
-            pstmt = txn.prepareStatement(sql);
-            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            ResultSet rs = pstmt.executeQuery();
-            while (rs.next()) {
-                standaloneList.add(rs.getLong(1));
-            }
-            rs.close();
-            pstmt.close();
+        List<Long> result = _joinMapDao.customSearch(sc, null);
+
+        txn.start();
+        if (result.size() > 0) {
+            Collections.sort(result);
+            Long[] ids = result.toArray(new Long[result.size()]);
 
-            // update for next wake-up
-            sql = "UPDATE async_job_join_map SET 
next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE 
next_wakeup < ? AND expiration > ?";
-            pstmt = txn.prepareStatement(sql);
-            pstmt.setString(1, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.setString(2, 
DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
-            pstmt.executeUpdate();
-            pstmt.close();
+            AsyncJobVO job = _jobDao.createForUpdate();
+            job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
 
-            txn.commit();
-        } catch (SQLException e) {
-            s_logger.error("Unexpected exception", e);
+            SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
+            SearchCriteria<SyncQueueItemVO> queueItemsSC = 
QueueJobIdsSearch.create("contentIds", ids);
+
+            _jobDao.update(job, sc2);
+
+            SyncQueueItemVO item = _queueItemDao.createForUpdate();
+            item.setLastProcessNumber(null);
+            item.setLastProcessMsid(null);
+            _queueItemDao.update(item, queueItemsSC);
         }
 
-        return standaloneList;
+        List<Long> wakupIds = _joinMapDao.findJobsToWakeBetween(cutDate);
+        txn.commit();
+
+        return wakupIds;
     }
 
     @Override
@@ -925,7 +904,10 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         JoinJobSearch.selectField(JoinJobSearch.entity().getJobId());
         JoinJobSearch.done();
 
-        JoinJobTimeSearch
+        JoinJobTimeSearch = _joinMapDao.createSearchBuilder(Long.class);
+        JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getNextWakeupTime(), 
Op.LT, "beginTime");
+        JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getExpiration(), 
Op.GT, "endTime");
+        
JoinJobTimeSearch.selectField(JoinJobTimeSearch.entity().getJobId()).done();
 
         JobIdsSearch = _jobDao.createSearchBuilder();
         JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();

Reply via email to