Updated Branches: refs/heads/4.3 cd8501e26 -> 8db0d83d1
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java index 61670bf..7b6eed7 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java @@ -23,9 +23,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO; import com.cloud.utils.db.GenericDao; public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> { - public SyncQueueItemVO getNextQueueItem(long queueId); - public List<SyncQueueItemVO> getNextQueueItems(int maxItems); - public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive); - public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive); - public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); + public SyncQueueItemVO getNextQueueItem(long queueId); + public int getActiveQueueItemCount(long queueId); + + public List<SyncQueueItemVO> getNextQueueItems(int maxItems); + + public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive); + + public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive); + + public Long getQueueItemIdByContentIdAndType(long contentId, String contentType); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java index 2f04a7c..41f1419 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java @@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.GenericSearchBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.TransactionLegacy; @@ -43,7 +44,8 @@ import com.cloud.utils.db.TransactionLegacy; public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao { private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class); final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch; - + final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch; + public SyncQueueItemDaoImpl() { super(); queueIdSearch = createSearchBuilder(Long.class); @@ -51,37 +53,52 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ); queueIdSearch.selectFields(queueIdSearch.entity().getId()); queueIdSearch.done(); + + queueActiveItemSearch = createSearchBuilder(Integer.class); + queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(), Op.EQ); + queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL); + queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId()); + queueActiveItemSearch.done(); } - @Override - public SyncQueueItemVO getNextQueueItem(long queueId) { - - SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder(); + @Override + public SyncQueueItemVO getNextQueueItem(long queueId) { + + SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder(); sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ); - sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL); + sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL); sb.done(); - - SearchCriteria<SyncQueueItemVO> sc = sb.create(); - sc.setParameters("queueId", queueId); - - Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L); + + SearchCriteria<SyncQueueItemVO> sc = sb.create(); + sc.setParameters("queueId", queueId); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L); List<SyncQueueItemVO> l = listBy(sc, filter); if(l != null && l.size() > 0) - return l.get(0); - - return null; - } - - @Override - public List<SyncQueueItemVO> getNextQueueItems(int maxItems) { - List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>(); - - String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + - " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + + return l.get(0); + + return null; + } + + @Override + public int getActiveQueueItemCount(long queueId) { + SearchCriteria<Integer> sc = queueActiveItemSearch.create(); + sc.setParameters("queueId", queueId); + + List<Integer> count = customSearch(sc, null); + return count.get(0); + } + + @Override + public List<SyncQueueItemVO> getNextQueueItems(int maxItems) { + List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>(); + + String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " + + " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " + " WHERE i.queue_proc_number IS NULL " + - " GROUP BY q.id " + - " ORDER BY i.id " + - " LIMIT 0, ?"; + " GROUP BY q.id " + + " ORDER BY i.id " + + " LIMIT 0, ?"; TransactionLegacy txn = TransactionLegacy.currentTxn(); PreparedStatement pstmt = null; @@ -90,54 +107,54 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> pstmt.setInt(1, maxItems); ResultSet rs = pstmt.executeQuery(); while(rs.next()) { - SyncQueueItemVO item = new SyncQueueItemVO(); - item.setId(rs.getLong(1)); - item.setQueueId(rs.getLong(2)); - item.setContentType(rs.getString(3)); - item.setContentId(rs.getLong(4)); - item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5))); - l.add(item); + SyncQueueItemVO item = new SyncQueueItemVO(); + item.setId(rs.getLong(1)); + item.setQueueId(rs.getLong(2)); + item.setContentType(rs.getString(3)); + item.setContentId(rs.getLong(4)); + item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5))); + l.add(item); } } catch (SQLException e) { - s_logger.error("Unexpected sql excetpion, ", e); + s_logger.error("Unexpected sql excetpion, ", e); } catch (Throwable e) { - s_logger.error("Unexpected excetpion, ", e); + s_logger.error("Unexpected excetpion, ", e); } - return l; - } - - @Override - public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) { - SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder(); + return l; + } + + @Override + public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) { + SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder(); sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(), - SearchCriteria.Op.EQ); + SearchCriteria.Op.EQ); sb.done(); - - SearchCriteria<SyncQueueItemVO> sc = sb.create(); - sc.setParameters("lastProcessMsid", msid); - - Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null); - - if(exclusive) - return lockRows(sc, filter, true); + + SearchCriteria<SyncQueueItemVO> sc = sb.create(); + sc.setParameters("lastProcessMsid", msid); + + Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null); + + if (exclusive) + return lockRows(sc, filter, true); return listBy(sc, filter); - } + } @Override public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) { Date cutTime = DateUtil.currentGMTTime(); - + SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder(); sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); - + sbItem.done(); - + SearchCriteria<SyncQueueItemVO> sc = sbItem.create(); sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs)); - + if(exclusive) return lockRows(sc, null, true); return listBy(sc, null); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index a77f864..63c365b 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -24,7 +24,6 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -88,12 +87,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds + private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; private static final int HEARTBEAT_INTERVAL = 2000; - private static final int GC_INTERVAL = 10000; // 10 seconds + private static final int GC_INTERVAL = 10000; // 10 seconds @Inject private SyncQueueItemDao _queueItemDao; @@ -362,38 +361,38 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, // I removed the temporary solution already. I think my changes should fix the deadlock. /* - ------------------------ - LATEST DETECTED DEADLOCK - ------------------------ - 130625 20:03:10 - *** (1) TRANSACTION: - TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494 - mysql tables in use 2, locked 1 - LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 - MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing - UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9) - *** (1) WAITING FOR THIS LOCK TO BE GRANTED: - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting - Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL ; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - - *** (2) TRANSACTION: - TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492 - mysql tables in use 2, locked 1 - 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 - MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing - UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8) - *** (2) HOLDS THE LOCK(S): - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap - Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL ; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - - *** (2) WAITING FOR THIS LOCK TO BE GRANTED: - RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting - Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 - 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL ; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; - - *** WE ROLL BACK TRANSACTION (2) + ------------------------ + LATEST DETECTED DEADLOCK + ------------------------ + 130625 20:03:10 + *** (1) TRANSACTION: + TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494 + mysql tables in use 2, locked 1 + LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 + MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing + UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9) + *** (1) WAITING FOR THIS LOCK TO BE GRANTED: + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting + Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N ULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + + *** (2) TRANSACTION: + TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492 + mysql tables in use 2, locked 1 + 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1 + MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing + UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8) + *** (2) HOLDS THE LOCK(S): + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap + Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N ULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + + *** (2) WAITING FOR THIS LOCK TO BE GRANTED: + RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting + Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0 + 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N ULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;; + + *** WE ROLL BACK TRANSACTION (2) */ _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid()); @@ -406,23 +405,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } SyncQueueVO queue = null; - - // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks - // we retry five times until we throw an exception - Random random = new Random(); - - for (int i = 0; i < 5; i++) { - queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit); - if (queue != null) { - break; - } - - try { - Thread.sleep(1000 + random.nextInt(5000)); - } catch (InterruptedException e) { - } - } - + queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit); if (queue == null) throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java index 7fb0245..9d3bf80 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java @@ -23,6 +23,7 @@ import java.util.List; import javax.inject.Inject; import org.apache.log4j.Logger; + import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao; import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao; @@ -146,18 +147,18 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage processNumber = new Long(1); else processNumber = processNumber + 1; - + Date dt = DateUtil.currentGMTTime(); queueVO.setLastProcessNumber(processNumber); queueVO.setLastUpdated(dt); queueVO.setQueueSize(queueVO.getQueueSize() + 1); _syncQueueDao.update(queueVO.getId(), queueVO); - + itemVO.setLastProcessMsid(msid); itemVO.setLastProcessNumber(processNumber); itemVO.setLastProcessTime(dt); _syncQueueItemDao.update(item.getId(), itemVO); - + resultList.add(item); } } @@ -183,9 +184,9 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); if(itemVO != null) { SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); - + _syncQueueItemDao.expunge(itemVO.getId()); - + // if item is active, reset queue information if (itemVO.getLastProcessMsid() != null) { queueVO.setLastUpdated(DateUtil.currentGMTTime()); @@ -239,18 +240,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage } private boolean queueReadyToProcess(SyncQueueVO queueVO) { - return true; - - // - // TODO - // - // Need to disable concurrency disable at queue level due to the need to support - // job wake-up dispatching task - // - // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler - // - - // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit(); + int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId()); + if (nActiveItems < queueVO.getQueueSizeLimit()) + return true; + + if (s_logger.isDebugEnabled()) + s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId() + + "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId() + + ") is reaching concurrency limit " + queueVO.getQueueSizeLimit()); + return false; } @Override
