Updated Branches: refs/heads/vmsync 688b047c2 -> 51f533e97
changes to jobs Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/51f533e9 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/51f533e9 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/51f533e9 Branch: refs/heads/vmsync Commit: 51f533e97ac2de9e631f6b13b54ddcd4f5a9c05b Parents: 688b047 Author: Alex Huang <alex.hu...@gmail.com> Authored: Tue Jun 4 11:02:16 2013 -0700 Committer: Alex Huang <alex.hu...@gmail.com> Committed: Tue Jun 4 11:02:16 2013 -0700 ---------------------------------------------------------------------- .../org/apache/cloudstack/context/CallContext.java | 8 +++- client/tomcatconf/applicationContext.xml.in | 4 +-- .../src/com/cloud/api/ApiAsyncJobDispatcher.java | 2 +- server/src/com/cloud/api/ApiDispatcher.java | 5 +-- server/src/com/cloud/api/ApiServer.java | 6 +++- .../src/com/cloud/async/AsyncJobManagerImpl.java | 25 ++++---------- 6 files changed, 22 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/api/src/org/apache/cloudstack/context/CallContext.java ---------------------------------------------------------------------- diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java index d0eaa25..bc8ad00 100644 --- a/api/src/org/apache/cloudstack/context/CallContext.java +++ b/api/src/org/apache/cloudstack/context/CallContext.java @@ -155,10 +155,14 @@ public class CallContext { s_logger.debug("Context removed " + context); String sessionId = context.getSessionId(); if (sessionId != null) { - while ((sessionId = NDC.pop()) != null) { - if (context.getSessionId().equals(sessionId)) { + String sessionIdOnStack = null; + while ((sessionIdOnStack = NDC.pop()) != null) { + if (sessionId.equals(sessionIdOnStack)) { break; } + if (s_logger.isTraceEnabled()) { + s_logger.trace("Popping from NDC: " + sessionId); + } } } return context; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/client/tomcatconf/applicationContext.xml.in ---------------------------------------------------------------------- diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index bc443bc..1c98346 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -808,9 +808,7 @@ <bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" /> <bean id="asyncJobJournalDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDaoImpl" /> <bean id="asyncJobJoinMapDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDaoImpl" /> - <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl"> - <property name="defaultDispatcher" value="ApiAsyncJobDispatcher" /> - </bean> + <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl"/> <bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.AsyncJobMonitor"/> <bean id="syncQueueDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl" /> <bean id="syncQueueItemDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl" /> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/server/src/com/cloud/api/ApiAsyncJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java index 8fee43f..081bcc1 100644 --- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java +++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java @@ -90,7 +90,7 @@ public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispat CallContext.register(userId, accountObject, "job-" + job.getShortUuid(), false); try { // dispatch could ultimately queue the job - _dispatcher.dispatch(cmdObj, params); + _dispatcher.dispatch(cmdObj, params, true); // serialize this to the async job table _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobConstants.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject()); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/server/src/com/cloud/api/ApiDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index e9536e2..ff709d0 100755 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -58,7 +58,6 @@ import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobManager; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.dao.EntityManager; import com.cloud.exception.InvalidParameterValueException; import com.cloud.user.Account; @@ -122,7 +121,7 @@ public class ApiDispatcher { } } - public void dispatch(BaseCmd cmd, Map<String, String> params) throws Exception { + public void dispatch(BaseCmd cmd, Map<String, String> params, boolean execute) throws Exception { processParameters(cmd, params); CallContext ctx = CallContext.current(); @@ -142,7 +141,7 @@ public class ApiDispatcher { } if (queueSizeLimit != null) { - if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) { + if (!execute) { // if we are not within async-execution context, enqueue the command _asyncMgr.syncAsyncJobExecution((AsyncJob)asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit); return; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/server/src/com/cloud/api/ApiServer.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index 2dfdaf0..175c8b8 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -171,6 +171,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer @Inject List<APIChecker> _apiAccessCheckers; @Inject + ApiAsyncJobDispatcher _asyncDispatcher; + + @Inject private EntityManager _entityMgr; @Inject private final RegionManager _regionMgr = null; @@ -520,6 +523,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(), ApiGsonHelper.getBuilder().create().toJson(params), instanceId, asyncCmd.getInstanceType() != null ? asyncCmd.getInstanceType().toString() : null); + job.setDispatcher(_asyncDispatcher.getName()); long jobId = _asyncMgr.submitAsyncJob(job); @@ -537,7 +541,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer return getBaseAsyncResponse(jobId, asyncCmd); } } else { - _dispatcher.dispatch(cmdObj, params); + _dispatcher.dispatch(cmdObj, params, false); // if the command is of the listXXXCommand, we will need to also return the // the job id and status if possible http://git-wip-us.apache.org/repos/asf/cloudstack/blob/51f533e9/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index bedeb43..42cbae8 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -108,15 +108,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private MessageBus _messageBus; @Inject private AsyncJobMonitor _jobMonitor; - // property - private String defaultDispatcher; - public String getDefaultDispatcher() { - return defaultDispatcher; - } - public void setDefaultDispatcher(String defaultDispatcher) { - this.defaultDispatcher = defaultDispatcher; - } - private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) @@ -491,16 +482,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } private AsyncJobDispatcher getDispatcher(String dispatcherName) { - if(dispatcherName == null || dispatcherName.isEmpty()) - dispatcherName = defaultDispatcher; + assert (dispatcherName != null && !dispatcherName.isEmpty()) : "Who's not setting the dispatcher when submitting a job? Who am I suppose to call if you do that!"; - if(_jobDispatchers != null) { - for(AsyncJobDispatcher dispatcher : _jobDispatchers) { - if(dispatcherName.equals(dispatcher.getName())) - return dispatcher; - } - } - return null; + for (AsyncJobDispatcher dispatcher : _jobDispatchers) { + if (dispatcherName.equals(dispatcher.getName())) + return dispatcher; + } + + throw new CloudRuntimeException("Unable to find dispatcher name: " + dispatcherName); } private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) {