Repository: cloudstack Updated Branches: refs/heads/4.4 14a4dd116 -> 1db329f0f
Deal with concurrent state update for VM and Host objects. Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ba562101 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ba562101 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ba562101 Branch: refs/heads/4.4 Commit: ba5621012747aae5d8c8f110936dca98b7537b45 Parents: 14a4dd1 Author: Kelven Yang <kelv...@gmail.com> Authored: Fri Mar 14 10:54:27 2014 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Fri Mar 14 15:08:04 2014 -0700 ---------------------------------------------------------------------- api/src/com/cloud/vm/VirtualMachine.java | 1 + .../com/cloud/vm/VirtualMachineManagerImpl.java | 25 ++----- .../src/com/cloud/host/dao/HostDaoImpl.java | 77 ++++++++++---------- .../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 54 ++++++-------- .../jobs/impl/AsyncJobManagerImpl.java | 18 +++-- 5 files changed, 80 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba562101/api/src/com/cloud/vm/VirtualMachine.java ---------------------------------------------------------------------- diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java index b085d4a..72cbb25 100755 --- a/api/src/com/cloud/vm/VirtualMachine.java +++ b/api/src/com/cloud/vm/VirtualMachine.java @@ -102,6 +102,7 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I s_fsm.addTransition(State.Running, VirtualMachine.Event.StopRequested, State.Stopping); s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportShutdowned, State.Stopped); s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportMigrated, State.Running); + s_fsm.addTransition(State.Running, VirtualMachine.Event.OperationSucceeded, State.Running); s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationRequested, State.Migrating); s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationSucceeded, State.Running); s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationFailed, State.Running); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba562101/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index e9d2fd2..91d58b8 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -36,12 +36,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.ejb.Local; import javax.inject.Inject; import javax.naming.ConfigurationException; import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao; -import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; @@ -51,19 +49,9 @@ import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; -import org.apache.cloudstack.framework.jobs.AsyncJob; -import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; -import org.apache.cloudstack.framework.jobs.AsyncJobManager; -import org.apache.cloudstack.framework.jobs.Outcome; -import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; -import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; -import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; -import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.MessageDispatcher; import org.apache.cloudstack.framework.messagebus.MessageHandler; -import org.apache.cloudstack.jobs.JobInfo; -import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.storage.to.VolumeObjectTO; @@ -291,7 +279,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } public void setHostAllocators(List<HostAllocator> hostAllocators) { - this.hostAllocators = hostAllocators; + hostAllocators = hostAllocators; } protected List<StoragePoolAllocator> _storagePoolAllocators; @@ -3243,9 +3231,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @SuppressWarnings("unchecked") public AgentVmInfo(String name, VMInstanceVO vm, State state, String host) { - this.name = name; - this.state = state; - this.vm = vm; + name = name; + state = state; + vm = vm; hostUuid = host; } @@ -4100,7 +4088,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vmId); - if (pendingWorkJobs.size() == 0 || !_haMgr.hasPendingHaWork(vmId)) { + if (pendingWorkJobs.size() == 0 && !_haMgr.hasPendingHaWork(vmId)) { // there is no pending operation job VMInstanceVO vm = _vmDao.findById(vmId); if (vm != null) { @@ -4407,7 +4395,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac @Override public boolean checkCondition() { VMInstanceVO instance = _vmDao.findById(vmId); - if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && srcHostIdForMigration.equals(instance.getPowerHostId()))) + if ((instance.getPowerState() == desiredPowerState && srcHostIdForMigration == null) || + (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration))) return true; return false; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba562101/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java index c2a9826..426c90d 100755 --- a/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java +++ b/engine/schema/src/com/cloud/host/dao/HostDaoImpl.java @@ -953,46 +953,49 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao int result = update(ub, sc, null); assert result <= 1 : "How can this update " + result + " rows? "; - if (status_logger.isDebugEnabled() && result == 0) { + if (result == 0) { HostVO ho = findById(host.getId()); assert ho != null : "How how how? : " + host.getId(); - StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString()); - str.append(". Name=").append(host.getName()); - str.append("; New=[status=") - .append(newStatus.toString()) - .append(":msid=") - .append(newStatus.lostConnection() ? "null" : host.getManagementServerId()) - .append(":lastpinged=") - .append(host.getLastPinged()) - .append("]"); - str.append("; Old=[status=") - .append(oldStatus.toString()) - .append(":msid=") - .append(host.getManagementServerId()) - .append(":lastpinged=") - .append(oldPingTime) - .append("]"); - str.append("; DB=[status=") - .append(vo.getStatus().toString()) - .append(":msid=") - .append(vo.getManagementServerId()) - .append(":lastpinged=") - .append(vo.getLastPinged()) - .append(":old update count=") - .append(oldUpdateCount) - .append("]"); - status_logger.debug(str.toString()); - } else { - StringBuilder msg = new StringBuilder("Agent status update: ["); - msg.append("id = " + host.getId()); - msg.append("; name = " + host.getName()); - msg.append("; old status = " + oldStatus); - msg.append("; event = " + event); - msg.append("; new status = " + newStatus); - msg.append("; old update count = " + oldUpdateCount); - msg.append("; new update count = " + newUpdateCount + "]"); - status_logger.debug(msg.toString()); + if (status_logger.isDebugEnabled()) { + + StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString()); + str.append(". Name=").append(host.getName()); + str.append("; New=[status=") + .append(newStatus.toString()) + .append(":msid=") + .append(newStatus.lostConnection() ? "null" : host.getManagementServerId()) + .append(":lastpinged=") + .append(host.getLastPinged()) + .append("]"); + str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(host.getManagementServerId()).append(":lastpinged=").append(oldPingTime) + .append("]"); + str.append("; DB=[status=") + .append(vo.getStatus().toString()) + .append(":msid=") + .append(vo.getManagementServerId()) + .append(":lastpinged=") + .append(vo.getLastPinged()) + .append(":old update count=") + .append(oldUpdateCount) + .append("]"); + status_logger.debug(str.toString()); + } else { + StringBuilder msg = new StringBuilder("Agent status update: ["); + msg.append("id = " + host.getId()); + msg.append("; name = " + host.getName()); + msg.append("; old status = " + oldStatus); + msg.append("; event = " + event); + msg.append("; new status = " + newStatus); + msg.append("; old update count = " + oldUpdateCount); + msg.append("; new update count = " + newUpdateCount + "]"); + status_logger.debug(msg.toString()); + } + + if (ho.getState() == newStatus) { + status_logger.debug("Host " + ho.getName() + " state has already been updated to " + newStatus); + return true; + } } return result > 0; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba562101/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java index 2f25f57..7c59492 100644 --- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java +++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java @@ -452,41 +452,29 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem ub.set(vmi, _updateTimeAttr, new Date()); int result = update(vmi, sc); - if (result == 0 && s_logger.isDebugEnabled()) { - + if (result == 0) { VMInstanceVO vo = findByIdIncludingRemoved(vm.getId()); - if (vo != null) { - StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString()); - str.append(": DB Data={Host=") - .append(vo.getHostId()) - .append("; State=") - .append(vo.getState().toString()) - .append("; updated=") - .append(vo.getUpdated()) - .append("; time=") - .append(vo.getUpdateTime()); - str.append("} New Data: {Host=") - .append(vm.getHostId()) - .append("; State=") - .append(vm.getState().toString()) - .append("; updated=") - .append(vmi.getUpdated()) - .append("; time=") - .append(vo.getUpdateTime()); - str.append("} Stale Data: {Host=") - .append(oldHostId) - .append("; State=") - .append(oldState) - .append("; updated=") - .append(oldUpdated) - .append("; time=") - .append(oldUpdateDate) - .append("}"); - s_logger.debug(str.toString()); - - } else { - s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed"); + if (s_logger.isDebugEnabled()) { + if (vo != null) { + StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString()); + str.append(": DB Data={Host=").append(vo.getHostId()).append("; State=").append(vo.getState().toString()).append("; updated=").append(vo.getUpdated()) + .append("; time=").append(vo.getUpdateTime()); + str.append("} New Data: {Host=").append(vm.getHostId()).append("; State=").append(vm.getState().toString()).append("; updated=").append(vmi.getUpdated()) + .append("; time=").append(vo.getUpdateTime()); + str.append("} Stale Data: {Host=").append(oldHostId).append("; State=").append(oldState).append("; updated=").append(oldUpdated).append("; time=") + .append(oldUpdateDate).append("}"); + s_logger.debug(str.toString()); + + } else { + s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed"); + } + } + + if (vo != null && vo.getState() == newState) { + // allow for concurrent update if target state has already been matched + s_logger.debug("VM " + vo.getInstanceName() + " state has been already been updated to " + newState); + return true; } } return result > 0; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba562101/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 49c3032..7f3701d 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -256,13 +256,17 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } }); - for (Long id : wakeupList) { - // TODO, we assume that all jobs in this category is API job only - AsyncJobVO jobToWakeup = _jobDao.findById(id); - if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) - scheduleExecution(jobToWakeup, false); - } - + // + // disable wakeup scheduling now, since all API jobs are currently using block-waiting for sub-jobs + // + /* + for (Long id : wakeupList) { + // TODO, we assume that all jobs in this category is API job only + AsyncJobVO jobToWakeup = _jobDao.findById(id); + if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) + scheduleExecution(jobToWakeup, false); + } + */ _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId); }