Updated Branches: refs/heads/vmsync 2ee8fd215 -> 34cae6349
More refactoring work Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/34cae634 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/34cae634 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/34cae634 Branch: refs/heads/vmsync Commit: 34cae6349c5070b42d7316c78a848757d1efa977 Parents: 2ee8fd2 Author: Kelven Yang <[email protected]> Authored: Tue Apr 9 16:58:35 2013 -0700 Committer: Kelven Yang <[email protected]> Committed: Tue Apr 9 16:58:35 2013 -0700 ---------------------------------------------------------------------- client/tomcatconf/applicationContext.xml.in | 16 +- .../src/com/cloud/api/ApiAsyncJobDispatcher.java | 121 +++++++++ server/src/com/cloud/api/ApiDispatcher.java | 8 - server/src/com/cloud/api/ApiServer.java | 1 - server/src/com/cloud/api/AsyncCommandQueued.java | 36 +++ server/src/com/cloud/async/AsyncCommandQueued.java | 35 --- server/src/com/cloud/async/AsyncJobDispatcher.java | 23 ++ .../src/com/cloud/async/AsyncJobManagerImpl.java | 189 +++++--------- 8 files changed, 258 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/client/tomcatconf/applicationContext.xml.in ---------------------------------------------------------------------- diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index 9eda426..5d2fb2c 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -168,8 +168,6 @@ <bean id="accountVlanMapDaoImpl" class="com.cloud.dc.dao.AccountVlanMapDaoImpl" /> <bean id="agentUpgradeDaoImpl" class="com.cloud.maint.dao.AgentUpgradeDaoImpl" /> <bean id="alertDaoImpl" class="com.cloud.alert.dao.AlertDaoImpl" /> - <bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" /> - <bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" /> <bean id="autoScalePolicyConditionMapDaoImpl" class="com.cloud.network.as.dao.AutoScalePolicyConditionMapDaoImpl" /> <bean id="autoScalePolicyDaoImpl" class="com.cloud.network.as.dao.AutoScalePolicyDaoImpl" /> <bean id="autoScaleVmGroupDaoImpl" class="com.cloud.network.as.dao.AutoScaleVmGroupDaoImpl" /> @@ -311,8 +309,6 @@ <bean id="storagePoolJoinDaoImpl" class="com.cloud.api.query.dao.StoragePoolJoinDaoImpl" /> <bean id="storagePoolWorkDaoImpl" class="com.cloud.storage.dao.StoragePoolWorkDaoImpl" /> <bean id="swiftDaoImpl" class="com.cloud.storage.dao.SwiftDaoImpl" /> - <bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" /> - <bean id="syncQueueItemDaoImpl" class="com.cloud.async.dao.SyncQueueItemDaoImpl" /> <bean id="templatePrimaryDataStoreDaoImpl" class="org.apache.cloudstack.storage.volume.db.TemplatePrimaryDataStoreDaoImpl" /> <bean id="uploadDaoImpl" class="com.cloud.storage.dao.UploadDaoImpl" /> <bean id="usageDaoImpl" class="com.cloud.usage.dao.UsageDaoImpl" /> @@ -662,7 +658,6 @@ <bean id="apiRateLimitServiceImpl" class="org.apache.cloudstack.ratelimit.ApiRateLimitServiceImpl"/> <bean id="alertManagerImpl" class="com.cloud.alert.AlertManagerImpl" /> - <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" /> <bean id="autoScaleManagerImpl" class="com.cloud.network.as.AutoScaleManagerImpl" /> <bean id="capacityManagerImpl" class="com.cloud.capacity.CapacityManagerImpl" /> <bean id="clusterFenceManagerImpl" class="com.cloud.cluster.ClusterFenceManagerImpl" /> @@ -695,7 +690,6 @@ <bean id="snapshotSchedulerImpl" class="com.cloud.storage.snapshot.SnapshotSchedulerImpl" /> <bean id="storageNetworkManagerImpl" class="com.cloud.network.StorageNetworkManagerImpl" /> <bean id="swiftManagerImpl" class="com.cloud.storage.swift.SwiftManagerImpl" /> - <bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" /> <bean id="taggedResourceManagerImpl" class="com.cloud.tags.TaggedResourceManagerImpl" /> <bean id="templateManagerImpl" class="com.cloud.template.TemplateManagerImpl" /> <bean id="upgradeManagerImpl" class="com.cloud.maint.UpgradeManagerImpl" /> @@ -792,6 +786,16 @@ <!-- --> <!--=======================================================================================================--> + + <!-- Async management --> + <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher" /> + <bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" /> + <bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" /> + <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" /> + <bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" /> + <bean id="syncQueueItemDaoImpl" class="com.cloud.async.dao.SyncQueueItemDaoImpl" /> + <bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" /> + <!-- Baremetal components --> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiAsyncJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java new file mode 100644 index 0000000..98ed5bf --- /dev/null +++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.api; + +import java.lang.reflect.Type; +import java.util.Map; + +import javax.inject.Inject; + +import org.apache.cloudstack.api.ApiErrorCode; +import org.apache.cloudstack.api.BaseAsyncCmd; +import org.apache.cloudstack.api.ServerApiException; +import org.apache.cloudstack.api.response.ExceptionResponse; +import org.apache.log4j.Logger; + +import com.cloud.async.AsyncJobDispatcher; +import com.cloud.async.AsyncJobManager; +import com.cloud.async.AsyncJobResult; +import com.cloud.async.AsyncJobVO; +import com.cloud.async.SyncQueueManager; +import com.cloud.user.Account; +import com.cloud.user.UserContext; +import com.cloud.user.dao.AccountDao; +import com.cloud.utils.component.AdapterBase; +import com.cloud.utils.component.ComponentContext; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +public class ApiAsyncJobDispatcher extends AdapterBase implements AsyncJobDispatcher { + private static final Logger s_logger = Logger.getLogger(ApiAsyncJobDispatcher.class); + + @Inject private ApiDispatcher _dispatcher; + + @Inject private AsyncJobManager _asyncJobMgr; + @Inject private SyncQueueManager _queueMgr; + @Inject private AccountDao _accountDao; + + public ApiAsyncJobDispatcher() { + } + + @Override + public void RunJob(AsyncJobVO job) { + BaseAsyncCmd cmdObj = null; + try { + Class<?> cmdClass = Class.forName(job.getCmd()); + cmdObj = (BaseAsyncCmd)cmdClass.newInstance(); + cmdObj = ComponentContext.inject(cmdObj); + cmdObj.configure(); + cmdObj.setJob(job); + + Type mapType = new TypeToken<Map<String, String>>() {}.getType(); + Gson gson = ApiGsonHelper.getBuilder().create(); + Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType); + + // whenever we deserialize, the UserContext needs to be updated + String userIdStr = params.get("ctxUserId"); + String acctIdStr = params.get("ctxAccountId"); + Long userId = null; + Account accountObject = null; + + if (userIdStr != null) { + userId = Long.parseLong(userIdStr); + } + + if (acctIdStr != null) { + accountObject = _accountDao.findById(Long.parseLong(acctIdStr)); + } + + UserContext.registerContext(userId, accountObject, null, false); + try { + // dispatch could ultimately queue the job + _dispatcher.dispatch(cmdObj, params); + + // serialize this to the async job table + _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject()); + } finally { + UserContext.unregisterContext(); + } + } catch(Throwable e) { + if (e instanceof AsyncCommandQueued) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("job " + job.getCmd() + " for job-" + job.getId() + " was queued, processing the queue."); + } + } else { + String errorMsg = null; + int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode(); + if (!(e instanceof ServerApiException)) { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + errorMsg = e.getMessage(); + } else { + ServerApiException sApiEx = (ServerApiException)e; + errorMsg = sApiEx.getDescription(); + errorCode = sApiEx.getErrorCode().getHttpCode(); + } + + ExceptionResponse response = new ExceptionResponse(); + response.setErrorCode(errorCode); + response.setErrorText(errorMsg); + response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); + + // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling + // and we need to preserve that as much as possible here + _asyncJobMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 925d90a..9298360 100755 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -27,7 +27,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.StringTokenizer; import java.util.regex.Matcher; @@ -56,21 +55,14 @@ import org.apache.cloudstack.api.command.user.event.ListEventsCmd; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; -import com.cloud.async.AsyncCommandQueued; import com.cloud.async.AsyncJobManager; import com.cloud.dao.EntityManager; -import com.cloud.exception.AccountLimitException; -import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.InvalidParameterValueException; -import com.cloud.exception.PermissionDeniedException; -import com.cloud.exception.ResourceAllocationException; -import com.cloud.exception.ResourceUnavailableException; import com.cloud.user.Account; import com.cloud.user.AccountManager; import com.cloud.user.UserContext; import com.cloud.utils.DateUtil; import com.cloud.utils.ReflectUtil; -import com.cloud.utils.component.ComponentContext; import com.cloud.utils.exception.CSExceptionErrorCode; import com.cloud.utils.exception.CloudRuntimeException; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiServer.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index d842819..d4d88b3 100755 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -17,7 +17,6 @@ package com.cloud.api; import com.cloud.api.response.ApiResponseSerializer; -import com.cloud.async.AsyncCommandQueued; import com.cloud.async.AsyncJob; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobVO; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/AsyncCommandQueued.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/api/AsyncCommandQueued.java b/server/src/com/cloud/api/AsyncCommandQueued.java new file mode 100644 index 0000000..ecd38c8 --- /dev/null +++ b/server/src/com/cloud/api/AsyncCommandQueued.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.api; + +import com.cloud.async.SyncQueueVO; +import com.cloud.utils.SerialVersionUID; +import com.cloud.utils.exception.CloudRuntimeException; + +public class AsyncCommandQueued extends CloudRuntimeException { + private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued; + + private SyncQueueVO _queue = null; + + public AsyncCommandQueued(SyncQueueVO queue, String msg) { + super(msg); + _queue = queue; + } + + public SyncQueueVO getQueue() { + return _queue; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncCommandQueued.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncCommandQueued.java b/server/src/com/cloud/async/AsyncCommandQueued.java deleted file mode 100644 index f01c214..0000000 --- a/server/src/com/cloud/async/AsyncCommandQueued.java +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - -import com.cloud.utils.SerialVersionUID; -import com.cloud.utils.exception.CloudRuntimeException; - -public class AsyncCommandQueued extends CloudRuntimeException { - private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued; - - private SyncQueueVO _queue = null; - - public AsyncCommandQueued(SyncQueueVO queue, String msg) { - super(msg); - _queue = queue; - } - - public SyncQueueVO getQueue() { - return _queue; - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java b/server/src/com/cloud/async/AsyncJobDispatcher.java new file mode 100644 index 0000000..80e6254 --- /dev/null +++ b/server/src/com/cloud/async/AsyncJobDispatcher.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async; + +import com.cloud.utils.component.Adapter; + +public interface AsyncJobDispatcher extends Adapter { + void RunJob(AsyncJobVO job); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 3c6d672..dfd6a01 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -36,17 +36,15 @@ import javax.inject.Inject; import javax.naming.ConfigurationException; import org.apache.cloudstack.api.ApiErrorCode; -import org.apache.cloudstack.api.BaseAsyncCmd; -import org.apache.cloudstack.api.ServerApiException; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.springframework.stereotype.Component; -import com.cloud.api.ApiDispatcher; -import com.cloud.api.ApiGsonHelper; +import com.cloud.api.ApiAsyncJobDispatcher; import com.cloud.api.ApiSerializerHelper; +import com.cloud.api.AsyncCommandQueued; import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; @@ -63,7 +61,6 @@ import com.cloud.user.dao.AccountDao; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.PropertiesUtil; -import com.cloud.utils.component.ComponentContext; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; @@ -73,11 +70,7 @@ import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.ExceptionUtil; import com.cloud.utils.mgmt.JmxUtil; import com.cloud.utils.net.MacAddress; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -@Component -@Local(value={AsyncJobManager.class}) public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener { public static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class.getName()); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds @@ -92,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Inject private AccountDao _accountDao; @Inject private AsyncJobDao _jobDao; @Inject private ConfigurationDao _configDao; + @Inject private List<AsyncJobDispatcher> _jobDispatchers; private long _jobExpireSeconds = 86400; // 1 day private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) - @Inject private ApiDispatcher _dispatcher; - private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); private ExecutorService _executor; @@ -377,14 +369,33 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, _executor.submit(runnable); } } - + + private AsyncJobDispatcher getDispatcher(String dispatcherName) { + if(dispatcherName == null || dispatcherName.isEmpty()) + dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName(); + + if(_jobDispatchers != null) { + for(AsyncJobDispatcher dispatcher : _jobDispatchers) { + if(dispatcherName.equals(dispatcher.getName())) + return dispatcher; + } + } + return null; + } + private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJobVO job) { return new Runnable() { @Override public void run() { - try { - long jobId = 0; - + Transaction txn = null; + try { + // + // setup execution environment + // + NDC.push("job-" + job.getId()); + + txn = Transaction.open(Transaction.CLOUD_DB); + try { JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job)); } catch(Exception e) { @@ -392,122 +403,58 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job)); - - BaseAsyncCmd cmdObj = null; - Transaction txn = Transaction.open(Transaction.CLOUD_DB); + + // execute the job + if(s_logger.isDebugEnabled()) { + s_logger.debug("Executing " + job.getCmd() + " for job-" + job.getId()); + } + + AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher()); + if(jobDispatcher != null) { + jobDispatcher.RunJob(job); + } else { + s_logger.error("Unable to find job dispatcher, job will be cancelled"); + completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Done executing " + job.getCmd() + " for job-" + job.getId()); + } + + } catch (Throwable e) { + s_logger.error("Unexpected exception", e); + completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null); + } finally { + // guard final clause as well try { - jobId = job.getId(); - NDC.push("job-" + jobId); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Executing " + job.getCmd() + " for job-" + jobId); - } - - Class<?> cmdClass = Class.forName(job.getCmd()); - cmdObj = (BaseAsyncCmd)cmdClass.newInstance(); - cmdObj = ComponentContext.inject(cmdObj); - cmdObj.configure(); - cmdObj.setJob(job); - - Type mapType = new TypeToken<Map<String, String>>() {}.getType(); - Gson gson = ApiGsonHelper.getBuilder().create(); - Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType); - - // whenever we deserialize, the UserContext needs to be updated - String userIdStr = params.get("ctxUserId"); - String acctIdStr = params.get("ctxAccountId"); - Long userId = null; - Account accountObject = null; - - if (userIdStr != null) { - userId = Long.parseLong(userIdStr); - } - - if (acctIdStr != null) { - accountObject = _accountDao.findById(Long.parseLong(acctIdStr)); - } - - UserContext.registerContext(userId, accountObject, null, false); - try { - // dispatch could ultimately queue the job - _dispatcher.dispatch(cmdObj, params); - - // serialize this to the async job table - completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject()); - } finally { - UserContext.unregisterContext(); - } - - // commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism... - if (job.getSyncSource() != null) { + if (job.getSyncSource() != null) { _queueMgr.purgeItem(job.getSyncSource().getId()); checkQueue(job.getSyncSource().getQueueId()); } - if (s_logger.isDebugEnabled()) { - s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); - } - + // + // clean execution environment + // + AsyncJobExecutionContext.setCurrentExecutionContext(null); + + try { + JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); + } catch(Exception e) { + s_logger.warn("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); + } + + if(txn != null) + txn.close(); + + NDC.pop(); } catch(Throwable e) { - if (e instanceof AsyncCommandQueued) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue."); - } - checkQueue(((AsyncCommandQueued)e).getQueue().getId()); - } else { - String errorMsg = null; - int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode(); - if (!(e instanceof ServerApiException)) { - s_logger.error("Unexpected exception while executing " + job.getCmd(), e); - errorMsg = e.getMessage(); - } else { - ServerApiException sApiEx = (ServerApiException)e; - errorMsg = sApiEx.getDescription(); - errorCode = sApiEx.getErrorCode().getHttpCode(); - } - - ExceptionResponse response = new ExceptionResponse(); - response.setErrorCode(errorCode); - response.setErrorText(errorMsg); - response.setResponseName((cmdObj == null) ? "unknowncommandresponse" : cmdObj.getCommandName()); - - // FIXME: setting resultCode to ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their exception handling - // and we need to preserve that as much as possible here - completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), response); - - // need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue - try { - if (job.getSyncSource() != null) { - _queueMgr.purgeItem(job.getSyncSource().getId()); - checkQueue(job.getSyncSource().getQueueId()); - } - } catch(Throwable ex) { - s_logger.fatal("Exception on exception, log it for record", ex); - } - } - } finally { - - try { - JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId()); - } catch(Exception e) { - s_logger.warn("Unable to unregister active job " + job.getId() + " from JMX monitoring"); - } - - txn.close(); - NDC.pop(); + s_logger.error("Double exception", e); } - } catch (Throwable th) { - try { - s_logger.error("Caught: " + th); - } catch (Throwable th2) { - } - } finally { - AsyncJobExecutionContext.setCurrentExecutionContext(null); - } + } } }; } - + private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) { AsyncJobVO job = _jobDao.findById(item.getContentId()); if (job != null) {
