Do not use row lock in sync-queue scheduling to work around mysql locking 
issues.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/5310e66f
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/5310e66f
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/5310e66f

Branch: refs/heads/resize-root
Commit: 5310e66f30fbfcc025b69b50a9a717557ac9eb3c
Parents: 7fa4715
Author: Kelven Yang <[email protected]>
Authored: Thu Mar 6 16:28:29 2014 -0800
Committer: Kelven Yang <[email protected]>
Committed: Thu Mar 13 16:59:56 2014 -0700

----------------------------------------------------------------------
 api/src/com/cloud/vm/VirtualMachine.java        |  2 ++
 .../jobs/impl/AsyncJobManagerImpl.java          | 16 ++++++++++++++
 .../jobs/impl/SyncQueueManagerImpl.java         | 22 ++++++++++----------
 3 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5310e66f/api/src/com/cloud/vm/VirtualMachine.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/vm/VirtualMachine.java 
b/api/src/com/cloud/vm/VirtualMachine.java
index dd11a82..b085d4a 100755
--- a/api/src/com/cloud/vm/VirtualMachine.java
+++ b/api/src/com/cloud/vm/VirtualMachine.java
@@ -119,10 +119,12 @@ public interface VirtualMachine extends RunningOn, 
ControlledEntity, Identity, I
             s_fsm.addTransition(State.Error, 
VirtualMachine.Event.DestroyRequested, State.Expunging);
             s_fsm.addTransition(State.Error, 
VirtualMachine.Event.ExpungeOperation, State.Expunging);
 
+            s_fsm.addTransition(State.Starting, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Stopping, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Stopped, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Running, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Migrating, 
VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
+
             s_fsm.addTransition(State.Starting, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
             s_fsm.addTransition(State.Stopping, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
             s_fsm.addTransition(State.Running, 
VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5310e66f/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 9b9460c..49c3032 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -659,8 +659,24 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
 
     private Runnable getHeartbeatTask() {
         return new ManagedContextRunnable() {
+
             @Override
             protected void runInContext() {
+                GlobalLock scanLock = 
GlobalLock.getInternLock("AsyncJobManagerHeartbeat");
+                try {
+                    if 
(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+                        try {
+                            reallyRun();
+                        } finally {
+                            scanLock.unlock();
+                        }
+                    }
+                } finally {
+                    scanLock.releaseRef();
+                }
+            }
+
+            protected void reallyRun() {
                 try {
                     List<SyncQueueItemVO> l = 
_queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
                     if (l != null && l.size() > 0) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5310e66f/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
index d8e2674..5160e05 100644
--- 
a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
+++ 
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -83,8 +83,8 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
             return Transaction.execute(new 
TransactionCallback<SyncQueueItemVO>() {
                 @Override
                 public SyncQueueItemVO doInTransaction(TransactionStatus 
status) {
-                    SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
-                    if (queueVO == null) {
+                    SyncQueueVO queueVO = _syncQueueDao.findById(queueId);
+                    if(queueVO == null) {
                         s_logger.error("Sync queue(id: " + queueId + ") does 
not exist");
                         return null;
                     }
@@ -139,11 +139,11 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
                 @Override
                 public void doInTransactionWithoutResult(TransactionStatus 
status) {
                     List<SyncQueueItemVO> l = 
_syncQueueItemDao.getNextQueueItems(maxItems);
-                    if (l != null && l.size() > 0) {
-                        for (SyncQueueItemVO item : l) {
-                            SyncQueueVO queueVO = 
_syncQueueDao.lockRow(item.getQueueId(), true);
-                            SyncQueueItemVO itemVO = 
_syncQueueItemDao.lockRow(item.getId(), true);
-                            if (queueReadyToProcess(queueVO) && 
itemVO.getLastProcessNumber() == null) {
+                    if(l != null && l.size() > 0) {
+                        for(SyncQueueItemVO item : l) {
+                            SyncQueueVO queueVO = 
_syncQueueDao.findById(item.getQueueId());
+                            SyncQueueItemVO itemVO = 
_syncQueueItemDao.findById(item.getId());
+                            if(queueReadyToProcess(queueVO) && 
itemVO.getLastProcessNumber() == null) {
                                 Long processNumber = 
queueVO.getLastProcessNumber();
                                 if (processNumber == null)
                                     processNumber = new Long(1);
@@ -184,8 +184,8 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
                 @Override
                 public void doInTransactionWithoutResult(TransactionStatus 
status) {
                     SyncQueueItemVO itemVO = 
_syncQueueItemDao.findById(queueItemId);
-                    if (itemVO != null) {
-                        SyncQueueVO queueVO = 
_syncQueueDao.lockRow(itemVO.getQueueId(), true);
+                    if(itemVO != null) {
+                        SyncQueueVO queueVO = 
_syncQueueDao.findById(itemVO.getQueueId());
 
                         _syncQueueItemDao.expunge(itemVO.getId());
 
@@ -213,8 +213,8 @@ public class SyncQueueManagerImpl extends ManagerBase 
implements SyncQueueManage
                 @Override
                 public void doInTransactionWithoutResult(TransactionStatus 
status) {
                     SyncQueueItemVO itemVO = 
_syncQueueItemDao.findById(queueItemId);
-                    if (itemVO != null) {
-                        SyncQueueVO queueVO = 
_syncQueueDao.lockRow(itemVO.getQueueId(), true);
+                    if(itemVO != null) {
+                        SyncQueueVO queueVO = 
_syncQueueDao.findById(itemVO.getQueueId());
 
                         itemVO.setLastProcessMsid(null);
                         itemVO.setLastProcessNumber(null);

Reply via email to