Updated Branches:
  refs/heads/4.3 cd8501e26 -> 8db0d83d1

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
index 61670bf..7b6eed7 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -23,9 +23,14 @@ import 
org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
 import com.cloud.utils.db.GenericDao;
 
 public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
-       public SyncQueueItemVO getNextQueueItem(long queueId);
-       public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
-       public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean 
exclusive);
-       public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, 
boolean exclusive);
-       public Long getQueueItemIdByContentIdAndType(long contentId, String 
contentType);
+    public SyncQueueItemVO getNextQueueItem(long queueId);
+    public int getActiveQueueItemCount(long queueId);
+
+    public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
+
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean 
exclusive);
+
+    public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, 
boolean exclusive);
+
+    public Long getQueueItemIdByContentIdAndType(long contentId, String 
contentType);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 2f04a7c..41f1419 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Func;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.TransactionLegacy;
 
@@ -43,7 +44,8 @@ import com.cloud.utils.db.TransactionLegacy;
 public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, 
Long> implements SyncQueueItemDao {
     private static final Logger s_logger = 
Logger.getLogger(SyncQueueItemDaoImpl.class);
     final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
-    
+    final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch;
+
     public SyncQueueItemDaoImpl() {
         super();
         queueIdSearch = createSearchBuilder(Long.class);
@@ -51,37 +53,52 @@ public class SyncQueueItemDaoImpl extends 
GenericDaoBase<SyncQueueItemVO, Long>
         queueIdSearch.and("contentType", 
queueIdSearch.entity().getContentType(), Op.EQ);
         queueIdSearch.selectFields(queueIdSearch.entity().getId());
         queueIdSearch.done();
+
+        queueActiveItemSearch = createSearchBuilder(Integer.class);
+        queueActiveItemSearch.and("queueId", 
queueActiveItemSearch.entity().getQueueId(), Op.EQ);
+        queueActiveItemSearch.and("processNumber", 
queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL);
+        queueActiveItemSearch.select(null, Func.COUNT, 
queueActiveItemSearch.entity().getId());
+        queueActiveItemSearch.done();
     }
 
-       @Override
-       public SyncQueueItemVO getNextQueueItem(long queueId) {
-               
-               SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+    @Override
+    public SyncQueueItemVO getNextQueueItem(long queueId) {
+
+        SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
         sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
-        sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(),        
SearchCriteria.Op.NULL);
+        sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), 
SearchCriteria.Op.NULL);
         sb.done();
-        
-       SearchCriteria<SyncQueueItemVO> sc = sb.create();
-       sc.setParameters("queueId", queueId);
-       
-       Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 
1L);
+
+        SearchCriteria<SyncQueueItemVO> sc = sb.create();
+        sc.setParameters("queueId", queueId);
+
+        Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 
1L);
         List<SyncQueueItemVO> l = listBy(sc, filter);
         if(l != null && l.size() > 0)
-               return l.get(0);
-       
-               return null;
-       }
-
-       @Override
-       public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
-               List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
-               
-               String sql = "SELECT i.id, i.queue_id, i.content_type, 
i.content_id, i.created " +
-                                        " FROM sync_queue AS q JOIN 
sync_queue_item AS i ON q.id = i.queue_id " +
+            return l.get(0);
+
+        return null;
+    }
+
+    @Override
+    public int getActiveQueueItemCount(long queueId) {
+        SearchCriteria<Integer> sc = queueActiveItemSearch.create();
+        sc.setParameters("queueId", queueId);
+
+        List<Integer> count = customSearch(sc, null);
+        return count.get(0);
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
+        List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
+
+        String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, 
i.created " +
+                " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = 
i.queue_id " +
                      " WHERE i.queue_proc_number IS NULL " +
-                                        " GROUP BY q.id " +
-                                        " ORDER BY i.id " +
-                                        " LIMIT 0, ?";
+                " GROUP BY q.id " +
+                " ORDER BY i.id " +
+                " LIMIT 0, ?";
 
         TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
@@ -90,54 +107,54 @@ public class SyncQueueItemDaoImpl extends 
GenericDaoBase<SyncQueueItemVO, Long>
             pstmt.setInt(1, maxItems);
             ResultSet rs = pstmt.executeQuery();
             while(rs.next()) {
-               SyncQueueItemVO item = new SyncQueueItemVO();
-               item.setId(rs.getLong(1));
-               item.setQueueId(rs.getLong(2));
-               item.setContentType(rs.getString(3));
-               item.setContentId(rs.getLong(4));
-               
item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), 
rs.getString(5)));
-               l.add(item);
+                SyncQueueItemVO item = new SyncQueueItemVO();
+                item.setId(rs.getLong(1));
+                item.setQueueId(rs.getLong(2));
+                item.setContentType(rs.getString(3));
+                item.setContentId(rs.getLong(4));
+                
item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), 
rs.getString(5)));
+                l.add(item);
             }
         } catch (SQLException e) {
-               s_logger.error("Unexpected sql excetpion, ", e);
+            s_logger.error("Unexpected sql excetpion, ", e);
         } catch (Throwable e) {
-               s_logger.error("Unexpected excetpion, ", e);
+            s_logger.error("Unexpected excetpion, ", e);
         }
-               return l;
-       }
-       
-       @Override
-       public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean 
exclusive) {
-               SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+        return l;
+    }
+
+    @Override
+    public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean 
exclusive) {
+        SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
         sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
-               SearchCriteria.Op.EQ);
+                SearchCriteria.Op.EQ);
         sb.done();
-        
-       SearchCriteria<SyncQueueItemVO> sc = sb.create();
-       sc.setParameters("lastProcessMsid", msid);
-       
-       Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 
null, null);
-       
-       if(exclusive)
-               return lockRows(sc, filter, true);
+
+        SearchCriteria<SyncQueueItemVO> sc = sb.create();
+        sc.setParameters("lastProcessMsid", msid);
+
+        Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 
null, null);
+
+        if (exclusive)
+            return lockRows(sc, filter, true);
         return listBy(sc, filter);
-       }
+    }
 
     @Override
     public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, 
boolean exclusive) {
         Date cutTime = DateUtil.currentGMTTime();
-        
+
         SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
         sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), 
SearchCriteria.Op.NNULL);
         sbItem.and("lastProcessNumber", 
sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
         sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), 
SearchCriteria.Op.NNULL);
         sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), 
SearchCriteria.Op.LT);
-        
+
         sbItem.done();
-        
+
         SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
         sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - 
thresholdMs));
-        
+
         if(exclusive)
             return lockRows(sc, null, true);
         return listBy(sc, null);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index a77f864..63c365b 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -24,7 +24,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -88,12 +87,12 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
 
     private static final Logger s_logger = 
Logger.getLogger(AsyncJobManagerImpl.class);
 
-    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3;  
// 3 seconds
-    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60;        
// 60 seconds
+    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3;  
   // 3 seconds
+    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60;     // 
60 seconds
 
     private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
     private static final int HEARTBEAT_INTERVAL = 2000;
-    private static final int GC_INTERVAL = 10000;                              
// 10 seconds
+    private static final int GC_INTERVAL = 10000;                // 10 seconds
 
     @Inject
     private SyncQueueItemDao _queueItemDao;
@@ -362,38 +361,38 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         // I removed the temporary solution already.  I think my changes 
should fix the deadlock.
 
         /*
-               ------------------------
-               LATEST DETECTED DEADLOCK
-               ------------------------
-               130625 20:03:10
-               *** (1) TRANSACTION:
-               TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS 
thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
-               mysql tables in use 2, locked 1
-               LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo 
log entries 1
-               MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 
cloud preparing
-               UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT 
job_id FROM async_job_join_map WHERE join_job_id = 9)
-               *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
-               RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not 
gap waiting
-               Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
-               0: len 8; hex 0000000000000008; asc         ;; 1: len 6; hex 
000005d8b0d8; asc       ;; 2: len 7; hex 00000009270110; asc     '  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL NULL
 ; 19: SQL NULL; 20: len 30; hex 
66376466396532362d323139622d346338652d393231332d393766653636; asc 
f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 
36623238306364362d663436652d343563322d383833642d333863616439; asc 
6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
-
-               *** (2) TRANSACTION:
-               TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS 
thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
-               mysql tables in use 2, locked 1
-               3 lock struct(s), heap size 368, 2 row lock(s), undo log 
entries 1
-               MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 
cloud preparing
-               UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT 
job_id FROM async_job_join_map WHERE join_job_id = 8)
-               *** (2) HOLDS THE LOCK(S):
-               RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
-               Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
-               0: len 8; hex 0000000000000008; asc         ;; 1: len 6; hex 
000005d8b0d8; asc       ;; 2: len 7; hex 00000009270110; asc     '  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL NULL
 ; 19: SQL NULL; 20: len 30; hex 
66376466396532362d323139622d346338652d393231332d393766653636; asc 
f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 
36623238306364362d663436652d343563322d383833642d333863616439; asc 
6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
-
-               *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
-               RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not 
gap waiting
-               Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
-               0: len 8; hex 0000000000000009; asc         ;; 1: len 6; hex 
000005d8b0d7; asc       ;; 2: len 7; hex 00000009280110; asc     (  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL NULL
 ; 19: SQL NULL; 20: len 30; hex 
62313065306432342d336233352d343663622d386361622d623933623562; asc 
b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 
39353664383563632d383336622d346663612d623738622d646238343739; asc 
956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
-
-               *** WE ROLL BACK TRANSACTION (2)
+                ------------------------
+                LATEST DETECTED DEADLOCK
+                ------------------------
+                130625 20:03:10
+                *** (1) TRANSACTION:
+                TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS 
thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
+                mysql tables in use 2, locked 1
+                LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo 
log entries 1
+                MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 
cloud preparing
+                UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT 
job_id FROM async_job_join_map WHERE join_job_id = 9)
+                *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
+                RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not 
gap waiting
+                Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
+                0: len 8; hex 0000000000000008; asc         ;; 1: len 6; hex 
000005d8b0d8; asc       ;; 2: len 7; hex 00000009270110; asc     '  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL N
 ULL; 19: SQL NULL; 20: len 30; hex 
66376466396532362d323139622d346338652d393231332d393766653636; asc 
f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 
36623238306364362d663436652d343563322d383833642d333863616439; asc 
6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
+
+                *** (2) TRANSACTION:
+                TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS 
thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
+                mysql tables in use 2, locked 1
+                3 lock struct(s), heap size 368, 2 row lock(s), undo log 
entries 1
+                MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 
cloud preparing
+                UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT 
job_id FROM async_job_join_map WHERE join_job_id = 8)
+                *** (2) HOLDS THE LOCK(S):
+                RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
+                Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
+                0: len 8; hex 0000000000000008; asc         ;; 1: len 6; hex 
000005d8b0d8; asc       ;; 2: len 7; hex 00000009270110; asc     '  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL N
 ULL; 19: SQL NULL; 20: len 30; hex 
66376466396532362d323139622d346338652d393231332d393766653636; asc 
f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 
36623238306364362d663436652d343563322d383833642d333863616439; asc 
6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
+
+                *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
+                RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` 
of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not 
gap waiting
+                Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact 
format; info bits 0
+                0: len 8; hex 0000000000000009; asc         ;; 1: len 6; hex 
000005d8b0d7; asc       ;; 2: len 7; hex 00000009280110; asc     (  ;; 3: len 
8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc     
    ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc 
org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 
7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc 
{"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     
;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: 
len 4; hex 80000000; asc     ;; 13: len 30; hex 
6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc 
org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; 
asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 
8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    
;; 18: SQL N
 ULL; 19: SQL NULL; 20: len 30; hex 
62313065306432342d336233352d343663622d386361622d623933623562; asc 
b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 
39353664383563632d383336622d346663612d623738622d646238343739; asc 
956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 
4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL 
NULL; 25: len 4; hex 80000000; asc     ;;
+
+                *** WE ROLL BACK TRANSACTION (2)
         */
 
         _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
@@ -406,23 +405,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
         }
 
         SyncQueueVO queue = null;
-
-        // to deal with temporary DB exceptions like DB deadlock/Lock-wait 
time out cased rollbacks
-        // we retry five times until we throw an exception
-        Random random = new Random();
-
-        for (int i = 0; i < 5; i++) {
-            queue = _queueMgr.queue(syncObjType, syncObjId, 
SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
-            if (queue != null) {
-                break;
-            }
-
-            try {
-                Thread.sleep(1000 + random.nextInt(5000));
-            } catch (InterruptedException e) {
-            }
-        }
-
+        queue = _queueMgr.queue(syncObjType, syncObjId, 
SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
         if (queue == null)
             throw new CloudRuntimeException("Unable to insert queue item into 
database, DB is full?");
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
index 7fb0245..9d3bf80 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.List;
 import javax.inject.Inject;
 
 import org.apache.log4j.Logger;
+
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
 import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
 
@@ -146,18 +147,18 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
                                     processNumber = new Long(1);
                                 else
                                     processNumber = processNumber + 1;
-        
+
                                 Date dt = DateUtil.currentGMTTime();
                                 queueVO.setLastProcessNumber(processNumber);
                                 queueVO.setLastUpdated(dt);
                                 queueVO.setQueueSize(queueVO.getQueueSize() + 
1);
                                 _syncQueueDao.update(queueVO.getId(), queueVO);
-        
+
                                 itemVO.setLastProcessMsid(msid);
                                 itemVO.setLastProcessNumber(processNumber);
                                 itemVO.setLastProcessTime(dt);
                                 _syncQueueItemDao.update(item.getId(), itemVO);
-        
+
                                 resultList.add(item);
                             }
                         }
@@ -183,9 +184,9 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
                     SyncQueueItemVO itemVO = 
_syncQueueItemDao.findById(queueItemId);
                     if(itemVO != null) {
                         SyncQueueVO queueVO = 
_syncQueueDao.lockRow(itemVO.getQueueId(), true);
-        
+
                         _syncQueueItemDao.expunge(itemVO.getId());
-        
+
                         // if item is active, reset queue information
                         if (itemVO.getLastProcessMsid() != null) {
                             queueVO.setLastUpdated(DateUtil.currentGMTTime());
@@ -239,18 +240,15 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
     }
 
     private boolean queueReadyToProcess(SyncQueueVO queueVO) {
-       return true;
-       
-       //
-       // TODO
-       //
-       // Need to disable concurrency disable at queue level due to the need 
to support
-       // job wake-up dispatching task
-       //
-       // Concurrency control is better done at higher level and leave the job 
scheduling/serializing simpler
-       //
-       
-        // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+        int nActiveItems = 
_syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
+        if (nActiveItems < queueVO.getQueueSizeLimit())
+            return true;
+
+        if (s_logger.isDebugEnabled())
+            s_logger.debug("Queue (queue id, sync type, sync id) - (" + 
queueVO.getId()
+                    + "," + queueVO.getSyncObjType() + ", " + 
queueVO.getSyncObjId()
+                    + ") is reaching concurrency limit " + 
queueVO.getQueueSizeLimit());
+        return false;
     }
 
     @Override

Reply via email to