Updated Branches: refs/heads/master 9aaea28d0 -> 7164fc6e7
CLOUDSTACK-5696: Fix sync issue with out-of-band changes Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/7164fc6e Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/7164fc6e Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/7164fc6e Branch: refs/heads/master Commit: 7164fc6e738137b452e89f8889a9cd3f3bdb3c29 Parents: 9aaea28 Author: Kelven Yang <[email protected]> Authored: Tue Jan 14 17:32:52 2014 -0800 Committer: Kelven Yang <[email protected]> Committed: Wed Jan 15 13:11:02 2014 -0800 ---------------------------------------------------------------------- .../com/cloud/vm/VirtualMachineManagerImpl.java | 16 ++- .../cloud/vm/VirtualMachinePowerStateSync.java | 2 + .../vm/VirtualMachinePowerStateSyncImpl.java | 21 ++-- .../src/com/cloud/vm/dao/VMInstanceDao.java | 2 + .../src/com/cloud/vm/dao/VMInstanceDaoImpl.java | 117 +++++++++++-------- .../framework/messagebus/MessageDispatcher.java | 62 ++++++++-- 6 files changed, 150 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 628528a..9894d31 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -61,6 +61,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageDispatcher; import org.apache.cloudstack.framework.messagebus.MessageHandler; import org.apache.cloudstack.jobs.JobInfo; import org.apache.cloudstack.managed.context.ManagedContextRunnable; @@ -578,6 +579,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _agentMgr.registerForHostEvents(this, true, true, true); + if (VmJobEnabled.value()) { + _messageBus.subscribe(VirtualMachineManager.Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this)); + } + return true; } @@ -3816,7 +3821,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // @MessageHandler(topic = Topics.VM_POWER_STATE) - private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) { + private void HandlePowerStateReport(String subject, String senderAddress, Object args) { assert (args != null); Long vmId = (Long)args; @@ -3836,7 +3841,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac break; // PowerUnknown shouldn't be reported, it is a derived - // VM power state from host state (host un-reachable + // VM power state from host state (host un-reachable) case PowerUnknown: default: assert (false); @@ -3846,8 +3851,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.warn("VM " + vmId + " no longer exists when processing VM state report"); } } else { - // TODO, do job wake-up signalling, since currently async job wake-up is not in use - // we will skip it for nows + // reset VM power state tracking so that we won't lost signal when VM has + // been translated to + _vmDao.resetVmPowerStateTracking(vmId); } } @@ -3924,6 +3930,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac switch (vm.getState()) { case Starting: case Stopping: + case Running: case Stopped: case Migrating: try { @@ -3937,7 +3944,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac // TODO: we need to forcely release all resource allocation break; - case Running: case Destroyed: case Expunging: break; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java index f84c7b7..152d0d8 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java @@ -28,4 +28,6 @@ public interface VirtualMachinePowerStateSync { // to adapt legacy ping report void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report); + + Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java index cd4c3c0..453890c 100644 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSyncImpl.java @@ -32,12 +32,9 @@ import com.cloud.vm.dao.VMInstanceDao; public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStateSync { private static final Logger s_logger = Logger.getLogger(VirtualMachinePowerStateSyncImpl.class); - @Inject - MessageBus _messageBus; - @Inject - VMInstanceDao _instanceDao; - @Inject - VirtualMachineManager _vmMgr; + @Inject MessageBus _messageBus; + @Inject VMInstanceDao _instanceDao; + @Inject VirtualMachineManager _vmMgr; public VirtualMachinePowerStateSyncImpl() { } @@ -53,7 +50,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat if (s_logger.isDebugEnabled()) s_logger.debug("Process host VM state report from ping process. host: " + hostId); - Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report); + Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report); processReport(hostId, translatedInfo); } @@ -62,7 +59,7 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat if (s_logger.isDebugEnabled()) s_logger.debug("Process host VM state report from ping process. host: " + hostId); - Map<Long, VirtualMachine.PowerState> translatedInfo = convertToInfos(report); + Map<Long, VirtualMachine.PowerState> translatedInfo = convertVmStateReport(report); processReport(hostId, translatedInfo); } @@ -74,16 +71,19 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat s_logger.debug("VM state report. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); if (_instanceDao.updatePowerState(entry.getKey(), hostId, entry.getValue())) { - if (s_logger.isDebugEnabled()) s_logger.debug("VM state report is updated. host: " + hostId + ", vm id: " + entry.getKey() + ", power state: " + entry.getValue()); _messageBus.publish(null, VirtualMachineManager.Topics.VM_POWER_STATE, PublishScope.GLOBAL, entry.getKey()); + } else { + if (s_logger.isDebugEnabled()) + s_logger.debug("VM power state does not change, skip DB writing. vm id: " + entry.getKey()); } } } - private Map<Long, VirtualMachine.PowerState> convertToInfos(Map<String, HostVmStateReportEntry> states) { + @Override + public Map<Long, VirtualMachine.PowerState> convertVmStateReport(Map<String, HostVmStateReportEntry> states) { final HashMap<Long, VirtualMachine.PowerState> map = new HashMap<Long, VirtualMachine.PowerState>(); if (states == null) { return map; @@ -93,7 +93,6 @@ public class VirtualMachinePowerStateSyncImpl implements VirtualMachinePowerStat VMInstanceVO vm = findVM(entry.getKey()); if (vm != null) { map.put(vm.getId(), entry.getValue().getState()); - break; } else { s_logger.info("Unable to find matched VM in CloudStack DB. name: " + entry.getKey()); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java index e6ea4a5..453d222 100644 --- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java +++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDao.java @@ -69,6 +69,8 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao< List<VMInstanceVO> findVMInTransition(Date time, State... states); + List<VMInstanceVO> listByHostAndState(long hostId, State... states); + List<VMInstanceVO> listByTypes(VirtualMachine.Type... types); VMInstanceVO findByIdTypes(long id, VirtualMachine.Type... types); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java ---------------------------------------------------------------------- diff --git a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java index 605ece3..2f25f57 100644 --- a/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java +++ b/engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java @@ -48,7 +48,11 @@ import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Func; import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallback; +import com.cloud.utils.db.TransactionCallbackNoReturn; import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.db.TransactionStatus; import com.cloud.utils.db.UpdateBuilder; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.vm.NicVO; @@ -76,6 +80,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem protected SearchBuilder<VMInstanceVO> TypesSearch; protected SearchBuilder<VMInstanceVO> IdTypesSearch; protected SearchBuilder<VMInstanceVO> HostIdTypesSearch; + protected SearchBuilder<VMInstanceVO> HostIdStatesSearch; protected SearchBuilder<VMInstanceVO> HostIdUpTypesSearch; protected SearchBuilder<VMInstanceVO> HostUpSearch; protected SearchBuilder<VMInstanceVO> InstanceNameSearch; @@ -182,6 +187,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem HostIdTypesSearch.and("types", HostIdTypesSearch.entity().getType(), Op.IN); HostIdTypesSearch.done(); + HostIdStatesSearch = createSearchBuilder(); + HostIdStatesSearch.and("hostId", HostIdStatesSearch.entity().getHostId(), Op.EQ); + HostIdStatesSearch.and("states", HostIdStatesSearch.entity().getState(), Op.IN); + HostIdStatesSearch.done(); + HostIdUpTypesSearch = createSearchBuilder(); HostIdUpTypesSearch.and("hostid", HostIdUpTypesSearch.entity().getHostId(), Op.EQ); HostIdUpTypesSearch.and("types", HostIdUpTypesSearch.entity().getType(), Op.IN); @@ -335,6 +345,15 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem } @Override + public List<VMInstanceVO> listByHostAndState(long hostId, State... states) { + SearchCriteria<VMInstanceVO> sc = HostIdStatesSearch.create(); + sc.setParameters("hostId", hostId); + sc.setParameters("states", (Object[])states); + + return listBy(sc); + } + + @Override public List<VMInstanceVO> listUpByHostIdTypes(long hostid, Type... types) { SearchCriteria<VMInstanceVO> sc = HostIdUpTypesSearch.create(); sc.setParameters("hostid", hostid); @@ -702,60 +721,66 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem } @Override - public boolean updatePowerState(long instanceId, long powerHostId, VirtualMachine.PowerState powerState) { - boolean needToUpdate = false; - TransactionLegacy txn = TransactionLegacy.currentTxn(); - txn.start(); - - VMInstanceVO instance = findById(instanceId); - if (instance != null) { - Long savedPowerHostId = instance.getPowerHostId(); - if (instance.getPowerState() != powerState || savedPowerHostId == null || savedPowerHostId.longValue() != powerHostId) { - instance.setPowerState(powerState); - instance.setPowerHostId(powerHostId); - instance.setPowerStateUpdateCount(1); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - needToUpdate = true; - update(instanceId, instance); - } else { - // to reduce DB updates, consecutive same state update for more than 3 times - if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) { - instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - needToUpdate = true; - update(instanceId, instance); + public boolean updatePowerState(final long instanceId, final long powerHostId, final VirtualMachine.PowerState powerState) { + return Transaction.execute(new TransactionCallback<Boolean>() { + @Override + public Boolean doInTransaction(TransactionStatus status) { + boolean needToUpdate = false; + VMInstanceVO instance = findById(instanceId); + if (instance != null) { + Long savedPowerHostId = instance.getPowerHostId(); + if (instance.getPowerState() != powerState || savedPowerHostId == null + || savedPowerHostId.longValue() != powerHostId) { + instance.setPowerState(powerState); + instance.setPowerHostId(powerHostId); + instance.setPowerStateUpdateCount(1); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + needToUpdate = true; + update(instanceId, instance); + } else { + // to reduce DB updates, consecutive same state update for more than 3 times + if (instance.getPowerStateUpdateCount() < MAX_CONSECUTIVE_SAME_STATE_UPDATE_COUNT) { + instance.setPowerStateUpdateCount(instance.getPowerStateUpdateCount() + 1); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + needToUpdate = true; + update(instanceId, instance); + } + } } + return needToUpdate; } - } - - txn.commit(); - return needToUpdate; + }); } @Override - public void resetVmPowerStateTracking(long instanceId) { - TransactionLegacy txn = TransactionLegacy.currentTxn(); - txn.start(); - VMInstanceVO instance = findById(instanceId); - if (instance != null) { - instance.setPowerStateUpdateCount(0); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - update(instanceId, instance); - } - - txn.commit(); + public void resetVmPowerStateTracking(final long instanceId) { + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + VMInstanceVO instance = findById(instanceId); + if (instance != null) { + instance.setPowerStateUpdateCount(0); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + update(instanceId, instance); + } + } + }); } - @Override - @DB - public void resetHostPowerStateTracking(long hostId) { - SearchCriteria<VMInstanceVO> sc = createSearchCriteria(); - sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId); + @Override @DB + public void resetHostPowerStateTracking(final long hostId) { + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + SearchCriteria<VMInstanceVO> sc = createSearchCriteria(); + sc.addAnd("powerHostId", SearchCriteria.Op.EQ, hostId); - VMInstanceVO instance = this.createForUpdate(); - instance.setPowerStateUpdateCount(0); - instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); + VMInstanceVO instance = createForUpdate(); + instance.setPowerStateUpdateCount(0); + instance.setPowerStateUpdateTime(DateUtil.currentGMTTime()); - this.update(instance, sc); + update(instance, sc); + } + }); } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7164fc6e/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java index a2d9a7b..e93bbc2 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDispatcher.java @@ -20,17 +20,24 @@ package org.apache.cloudstack.framework.messagebus; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.log4j.Logger; + public class MessageDispatcher implements MessageSubscriber { - private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>(); + private static final Logger s_logger = Logger.getLogger(MessageDispatcher.class); + + private static Map<Class<?>, List<Method>> s_handlerCache = new HashMap<Class<?>, List<Method>>(); private static Map<Object, MessageDispatcher> s_targetMap = new HashMap<Object, MessageDispatcher>(); private Object _targetObject; public MessageDispatcher(Object targetObject) { _targetObject = targetObject; + buildHandlerMethodCache(targetObject.getClass()); } @Override @@ -67,10 +74,13 @@ public class MessageDispatcher implements MessageSubscriber { try { handler.invoke(target, subject, senderAddress, args); } catch (IllegalArgumentException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject); } catch (IllegalAccessException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject); } catch (InvocationTargetException e) { + s_logger.error("Unexpected exception when calling " + target.getClass().getName() + "." + handler.getName(), e); throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject); } @@ -79,18 +89,18 @@ public class MessageDispatcher implements MessageSubscriber { public static Method resolveHandler(Class<?> handlerClz, String subject) { synchronized (s_handlerCache) { - Method handler = s_handlerCache.get(handlerClz); - if (handler != null) - return handler; + List<Method> handlerList = s_handlerCache.get(handlerClz); + if (handlerList != null) { + for (Method method : handlerList) { + MessageHandler annotation = method.getAnnotation(MessageHandler.class); + assert (annotation != null); - for (Method method : handlerClz.getMethods()) { - MessageHandler annotation = method.getAnnotation(MessageHandler.class); - if (annotation != null) { if (match(annotation.topic(), subject)) { - s_handlerCache.put(handlerClz, method); return method; } } + } else { + s_logger.error("Handler class " + handlerClz.getName() + " is not registered"); } } @@ -100,4 +110,40 @@ public class MessageDispatcher implements MessageSubscriber { private static boolean match(String expression, String param) { return param.matches(expression); } + + private void buildHandlerMethodCache(Class<?> handlerClz) { + if (s_logger.isInfoEnabled()) + s_logger.info("Build message handler cache for " + handlerClz.getName()); + + synchronized (s_handlerCache) { + List<Method> handlerList = s_handlerCache.get(handlerClz); + if (handlerList == null) { + handlerList = new ArrayList<Method>(); + s_handlerCache.put(handlerClz, handlerList); + + Class<?> clz = handlerClz; + while (clz != null && clz != Object.class) { + for (Method method : clz.getDeclaredMethods()) { + MessageHandler annotation = method.getAnnotation(MessageHandler.class); + if (annotation != null) { + // allow private member access via reflection + method.setAccessible(true); + handlerList.add(method); + + if (s_logger.isInfoEnabled()) + s_logger.info("Add message handler " + handlerClz.getName() + "." + method.getName() + " to cache"); + } + } + + clz = clz.getSuperclass(); + } + } else { + if (s_logger.isInfoEnabled()) + s_logger.info("Message handler for class " + handlerClz.getName() + " is already in cache"); + } + } + + if (s_logger.isInfoEnabled()) + s_logger.info("Done building message handler cache for " + handlerClz.getName()); + } }
