Updated Branches: refs/heads/vmsync 0233044b2 -> 422d9b8da
Fixed the build error Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a407b33a Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a407b33a Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a407b33a Branch: refs/heads/vmsync Commit: a407b33a77e3e20037214a88dc6b0564671a6f44 Parents: 0233044 Author: Alex Huang <[email protected]> Authored: Thu Jun 27 15:23:42 2013 -0700 Committer: Alex Huang <[email protected]> Committed: Thu Jun 27 15:23:42 2013 -0700 ---------------------------------------------------------------------- .../framework/jobs/dao/AsyncJobJoinMapDao.java | 5 +- .../jobs/dao/AsyncJobJoinMapDaoImpl.java | 144 +++++++++++-------- .../jobs/impl/AsyncJobManagerImpl.java | 76 ++++------ 3 files changed, 119 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java index 4458fa2..577ed10 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDao.java @@ -16,6 +16,7 @@ // under the License. package org.apache.cloudstack.framework.jobs.dao; +import java.util.Date; import java.util.List; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; @@ -36,8 +37,10 @@ public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult, long completeMsid); - List<Long> wakeupScan(); +// List<Long> wakeupScan(); List<Long> findJobsToWake(long joinedJobId); + + List<Long> findJobsToWakeBetween(Date cutDate); // List<Long> wakeupByJoinedJobCompletion(long joinedJobId); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java index fa3b14b..20d8ba6 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java @@ -26,7 +26,6 @@ import java.util.TimeZone; import org.apache.log4j.Logger; -import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; import org.apache.cloudstack.jobs.JobInfo; @@ -152,66 +151,67 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo update(ub, sc, null); } - @Override - public List<Long> wakeupScan() { - List<Long> standaloneList = new ArrayList<Long>(); - - Date cutDate = DateUtil.currentGMTTime(); - - Transaction txn = Transaction.currentTxn(); - PreparedStatement pstmt = null; - try { - txn.start(); - - // - // performance sensitive processing, do it in plain SQL - // - String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + - "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; - pstmt = txn.prepareStatement(sql); - pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); - - sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + - "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); - - sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - standaloneList.add(rs.getLong(1)); - } - rs.close(); - pstmt.close(); - - // update for next wake-up - sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); - - txn.commit(); - } catch (SQLException e) { - s_logger.error("Unexpected exception", e); - } - - return standaloneList; - } +// @Override +// public List<Long> wakeupScan() { +// List<Long> standaloneList = new ArrayList<Long>(); +// +// Date cutDate = DateUtil.currentGMTTime(); +// +// Transaction txn = Transaction.currentTxn(); +// PreparedStatement pstmt = null; +// try { +// txn.start(); +// +// // +// // performance sensitive processing, do it in plain SQL +// // +// String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + +// "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; +// pstmt = txn.prepareStatement(sql); +// pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP); +// pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.executeUpdate(); +// pstmt.close(); +// +// sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + +// "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; +// pstmt = txn.prepareStatement(sql); +// pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.executeUpdate(); +// pstmt.close(); +// +// sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; +// pstmt = txn.prepareStatement(sql); +// pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// ResultSet rs = pstmt.executeQuery(); +// while(rs.next()) { +// standaloneList.add(rs.getLong(1)); +// } +// rs.close(); +// pstmt.close(); +// +// // update for next wake-up +// sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; +// pstmt = txn.prepareStatement(sql); +// pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); +// pstmt.executeUpdate(); +// pstmt.close(); +// +// txn.commit(); +// } catch (SQLException e) { +// s_logger.error("Unexpected exception", e); +// } +// +// return standaloneList; +// } @Override public List<Long> findJobsToWake(long joinedJobId) { + // TODO: We should fix this. We shouldn't be crossing daos in a dao code. List<Long> standaloneList = new ArrayList<Long>(); Transaction txn = Transaction.currentTxn(); String sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; @@ -227,6 +227,34 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo } return standaloneList; } + + @Override + public List<Long> findJobsToWakeBetween(Date cutDate) { + List<Long> standaloneList = new ArrayList<Long>(); + Transaction txn = Transaction.currentTxn(); + try { + String sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; + PreparedStatement pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + standaloneList.add(rs.getLong(1)); + } + + // update for next wake-up + sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; + pstmt = txn.prepareStatement(sql); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + pstmt.executeUpdate(); + + return standaloneList; + } catch (SQLException e) { + throw new CloudRuntimeException("Unable to handle SQL exception", e); + } + + } // @Override // public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a407b33a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 9b6aa97..5c3908d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -20,8 +20,6 @@ package org.apache.cloudstack.framework.jobs.impl; import java.io.File; import java.io.FileInputStream; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -819,7 +817,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId); List<Long> result = _joinMapDao.customSearch(joinJobSC, null); - if (result.size() != 0) { + if (result.size() > 0) { Collections.sort(result); Long[] ids = result.toArray(new Long[result.size()]); @@ -849,55 +847,36 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; - try { - txn.start(); + SearchCriteria<Long> sc = JoinJobTimeSearch.create(); + sc.setParameters("beginTime", DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + sc.setParameters("endTime", DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - // - // performance sensitive processing, do it in plain SQL - // - String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " + - "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; - pstmt = txn.prepareStatement(sql); - pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); - - sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " + - "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); - - sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - ResultSet rs = pstmt.executeQuery(); - while (rs.next()) { - standaloneList.add(rs.getLong(1)); - } - rs.close(); - pstmt.close(); + List<Long> result = _joinMapDao.customSearch(sc, null); + + txn.start(); + if (result.size() > 0) { + Collections.sort(result); + Long[] ids = result.toArray(new Long[result.size()]); - // update for next wake-up - sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?"; - pstmt = txn.prepareStatement(sql); - pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); - pstmt.executeUpdate(); - pstmt.close(); + AsyncJobVO job = _jobDao.createForUpdate(); + job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP); - txn.commit(); - } catch (SQLException e) { - s_logger.error("Unexpected exception", e); + SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids); + SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids); + + _jobDao.update(job, sc2); + + SyncQueueItemVO item = _queueItemDao.createForUpdate(); + item.setLastProcessNumber(null); + item.setLastProcessMsid(null); + _queueItemDao.update(item, queueItemsSC); } - return standaloneList; + List<Long> wakupIds = _joinMapDao.findJobsToWakeBetween(cutDate); + txn.commit(); + + return wakupIds; } @Override @@ -925,7 +904,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, JoinJobSearch.selectField(JoinJobSearch.entity().getJobId()); JoinJobSearch.done(); - JoinJobTimeSearch + JoinJobTimeSearch = _joinMapDao.createSearchBuilder(Long.class); + JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getNextWakeupTime(), Op.LT, "beginTime"); + JoinJobTimeSearch.and(JoinJobTimeSearch.entity().getExpiration(), Op.GT, "endTime"); + JoinJobTimeSearch.selectField(JoinJobTimeSearch.entity().getJobId()).done(); JobIdsSearch = _jobDao.createSearchBuilder(); JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();
