Updated Branches:
  refs/heads/vmsync 2ee8fd215 -> 34cae6349

More refactoring work


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/34cae634
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/34cae634
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/34cae634

Branch: refs/heads/vmsync
Commit: 34cae6349c5070b42d7316c78a848757d1efa977
Parents: 2ee8fd2
Author: Kelven Yang <[email protected]>
Authored: Tue Apr 9 16:58:35 2013 -0700
Committer: Kelven Yang <[email protected]>
Committed: Tue Apr 9 16:58:35 2013 -0700

----------------------------------------------------------------------
 client/tomcatconf/applicationContext.xml.in        |   16 +-
 .../src/com/cloud/api/ApiAsyncJobDispatcher.java   |  121 +++++++++
 server/src/com/cloud/api/ApiDispatcher.java        |    8 -
 server/src/com/cloud/api/ApiServer.java            |    1 -
 server/src/com/cloud/api/AsyncCommandQueued.java   |   36 +++
 server/src/com/cloud/async/AsyncCommandQueued.java |   35 ---
 server/src/com/cloud/async/AsyncJobDispatcher.java |   23 ++
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |  189 +++++---------
 8 files changed, 258 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in 
b/client/tomcatconf/applicationContext.xml.in
index 9eda426..5d2fb2c 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -168,8 +168,6 @@
   <bean id="accountVlanMapDaoImpl" 
class="com.cloud.dc.dao.AccountVlanMapDaoImpl" />
   <bean id="agentUpgradeDaoImpl" 
class="com.cloud.maint.dao.AgentUpgradeDaoImpl" />
   <bean id="alertDaoImpl" class="com.cloud.alert.dao.AlertDaoImpl" />
-  <bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" />
-  <bean id="asyncJobJoinDaoImpl" 
class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
   <bean id="autoScalePolicyConditionMapDaoImpl" 
class="com.cloud.network.as.dao.AutoScalePolicyConditionMapDaoImpl" />
   <bean id="autoScalePolicyDaoImpl" 
class="com.cloud.network.as.dao.AutoScalePolicyDaoImpl" />
   <bean id="autoScaleVmGroupDaoImpl" 
class="com.cloud.network.as.dao.AutoScaleVmGroupDaoImpl" />
@@ -311,8 +309,6 @@
   <bean id="storagePoolJoinDaoImpl" 
class="com.cloud.api.query.dao.StoragePoolJoinDaoImpl" />
   <bean id="storagePoolWorkDaoImpl" 
class="com.cloud.storage.dao.StoragePoolWorkDaoImpl" />
   <bean id="swiftDaoImpl" class="com.cloud.storage.dao.SwiftDaoImpl" />
-  <bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" />
-  <bean id="syncQueueItemDaoImpl" 
class="com.cloud.async.dao.SyncQueueItemDaoImpl" />
   <bean id="templatePrimaryDataStoreDaoImpl" 
class="org.apache.cloudstack.storage.volume.db.TemplatePrimaryDataStoreDaoImpl" 
/>
   <bean id="uploadDaoImpl" class="com.cloud.storage.dao.UploadDaoImpl" />
   <bean id="usageDaoImpl" class="com.cloud.usage.dao.UsageDaoImpl" />
@@ -662,7 +658,6 @@
   <bean id="apiRateLimitServiceImpl" 
class="org.apache.cloudstack.ratelimit.ApiRateLimitServiceImpl"/>
 
   <bean id="alertManagerImpl" class="com.cloud.alert.AlertManagerImpl" />
-  <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" />
   <bean id="autoScaleManagerImpl" 
class="com.cloud.network.as.AutoScaleManagerImpl" />
   <bean id="capacityManagerImpl" 
class="com.cloud.capacity.CapacityManagerImpl" />
   <bean id="clusterFenceManagerImpl" 
class="com.cloud.cluster.ClusterFenceManagerImpl" />
@@ -695,7 +690,6 @@
   <bean id="snapshotSchedulerImpl" 
class="com.cloud.storage.snapshot.SnapshotSchedulerImpl" />
   <bean id="storageNetworkManagerImpl" 
class="com.cloud.network.StorageNetworkManagerImpl" />
   <bean id="swiftManagerImpl" class="com.cloud.storage.swift.SwiftManagerImpl" 
/>
-  <bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" 
/>
   <bean id="taggedResourceManagerImpl" 
class="com.cloud.tags.TaggedResourceManagerImpl" />
   <bean id="templateManagerImpl" 
class="com.cloud.template.TemplateManagerImpl" />
   <bean id="upgradeManagerImpl" class="com.cloud.maint.UpgradeManagerImpl" />
@@ -792,6 +786,16 @@
 <!--                                                                           
                            -->
 
<!--=======================================================================================================-->
 
+
+  <!-- Async management -->
+  <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher" 
/>
+  <bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" />
+  <bean id="asyncJobJoinDaoImpl" 
class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
+  <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" />
+  <bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" />
+  <bean id="syncQueueItemDaoImpl" 
class="com.cloud.async.dao.SyncQueueItemDaoImpl" />
+  <bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" 
/>
+
   <!--
     Baremetal components
   -->

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java 
b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
new file mode 100644
index 0000000..98ed5bf
--- /dev/null
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.api;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.api.ApiErrorCode;
+import org.apache.cloudstack.api.BaseAsyncCmd;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.ExceptionResponse;
+import org.apache.log4j.Logger;
+
+import com.cloud.async.AsyncJobDispatcher;
+import com.cloud.async.AsyncJobManager;
+import com.cloud.async.AsyncJobResult;
+import com.cloud.async.AsyncJobVO;
+import com.cloud.async.SyncQueueManager;
+import com.cloud.user.Account;
+import com.cloud.user.UserContext;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.utils.component.ComponentContext;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+public class ApiAsyncJobDispatcher extends AdapterBase implements 
AsyncJobDispatcher {
+    private static final Logger s_logger = 
Logger.getLogger(ApiAsyncJobDispatcher.class);
+
+    @Inject private ApiDispatcher _dispatcher;
+    
+    @Inject private AsyncJobManager _asyncJobMgr;
+    @Inject private SyncQueueManager _queueMgr;
+    @Inject private AccountDao _accountDao;
+    
+    public ApiAsyncJobDispatcher() {
+    }
+    
+       @Override
+       public void RunJob(AsyncJobVO job) {
+        BaseAsyncCmd cmdObj = null;
+        try {
+            Class<?> cmdClass = Class.forName(job.getCmd());
+            cmdObj = (BaseAsyncCmd)cmdClass.newInstance();
+            cmdObj = ComponentContext.inject(cmdObj);
+            cmdObj.configure();
+            cmdObj.setJob(job);
+
+            Type mapType = new TypeToken<Map<String, String>>() {}.getType();
+            Gson gson = ApiGsonHelper.getBuilder().create();
+            Map<String, String> params = gson.fromJson(job.getCmdInfo(), 
mapType);
+
+            // whenever we deserialize, the UserContext needs to be updated
+            String userIdStr = params.get("ctxUserId");
+            String acctIdStr = params.get("ctxAccountId");
+            Long userId = null;
+            Account accountObject = null;
+
+            if (userIdStr != null) {
+                userId = Long.parseLong(userIdStr);
+            }
+
+            if (acctIdStr != null) {
+                accountObject = 
_accountDao.findById(Long.parseLong(acctIdStr));
+            }
+
+            UserContext.registerContext(userId, accountObject, null, false);
+            try {
+                // dispatch could ultimately queue the job
+                _dispatcher.dispatch(cmdObj, params);
+
+                // serialize this to the async job table
+                _asyncJobMgr.completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
+            } finally {
+                UserContext.unregisterContext();
+            }
+        } catch(Throwable e) {
+            if (e instanceof AsyncCommandQueued) {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("job " + job.getCmd() + " for job-" + 
job.getId() + " was queued, processing the queue.");
+                }
+            } else {
+                String errorMsg = null;
+                int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
+                if (!(e instanceof ServerApiException)) {
+                    s_logger.error("Unexpected exception while executing " + 
job.getCmd(), e);
+                    errorMsg = e.getMessage();
+                } else {
+                    ServerApiException sApiEx = (ServerApiException)e;
+                    errorMsg = sApiEx.getDescription();
+                    errorCode = sApiEx.getErrorCode().getHttpCode();
+                }
+
+                ExceptionResponse response = new ExceptionResponse();
+                response.setErrorCode(errorCode);
+                response.setErrorText(errorMsg);
+                response.setResponseName((cmdObj == null) ? 
"unknowncommandresponse" : cmdObj.getCommandName());
+
+                // FIXME:  setting resultCode to ApiErrorCode.INTERNAL_ERROR 
is not right, usually executors have their exception handling
+                //         and we need to preserve that as much as possible 
here
+                _asyncJobMgr.completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), 
response);
+            }
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiDispatcher.java 
b/server/src/com/cloud/api/ApiDispatcher.java
index 925d90a..9298360 100755
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@ -27,7 +27,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 
@@ -56,21 +55,14 @@ import 
org.apache.cloudstack.api.command.user.event.ListEventsCmd;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
-import com.cloud.async.AsyncCommandQueued;
 import com.cloud.async.AsyncJobManager;
 import com.cloud.dao.EntityManager;
-import com.cloud.exception.AccountLimitException;
-import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.InvalidParameterValueException;
-import com.cloud.exception.PermissionDeniedException;
-import com.cloud.exception.ResourceAllocationException;
-import com.cloud.exception.ResourceUnavailableException;
 import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
 import com.cloud.user.UserContext;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.ReflectUtil;
-import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.exception.CSExceptionErrorCode;
 import com.cloud.utils.exception.CloudRuntimeException;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java 
b/server/src/com/cloud/api/ApiServer.java
index d842819..d4d88b3 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -17,7 +17,6 @@
 package com.cloud.api;
 
 import com.cloud.api.response.ApiResponseSerializer;
-import com.cloud.async.AsyncCommandQueued;
 import com.cloud.async.AsyncJob;
 import com.cloud.async.AsyncJobManager;
 import com.cloud.async.AsyncJobVO;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/api/AsyncCommandQueued.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/AsyncCommandQueued.java 
b/server/src/com/cloud/api/AsyncCommandQueued.java
new file mode 100644
index 0000000..ecd38c8
--- /dev/null
+++ b/server/src/com/cloud/api/AsyncCommandQueued.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.api;
+
+import com.cloud.async.SyncQueueVO;
+import com.cloud.utils.SerialVersionUID;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class AsyncCommandQueued extends CloudRuntimeException {
+    private static final long serialVersionUID = 
SerialVersionUID.AsyncCommandQueued;
+
+    private SyncQueueVO _queue = null;
+
+    public AsyncCommandQueued(SyncQueueVO queue, String msg) {
+        super(msg);
+        _queue = queue;
+    }
+
+    public SyncQueueVO getQueue() {
+        return _queue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncCommandQueued.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncCommandQueued.java 
b/server/src/com/cloud/async/AsyncCommandQueued.java
deleted file mode 100644
index f01c214..0000000
--- a/server/src/com/cloud/async/AsyncCommandQueued.java
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.async;
-
-import com.cloud.utils.SerialVersionUID;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class AsyncCommandQueued extends CloudRuntimeException {
-    private static final long serialVersionUID = 
SerialVersionUID.AsyncCommandQueued;
-
-    private SyncQueueVO _queue = null;
-
-    public AsyncCommandQueued(SyncQueueVO queue, String msg) {
-        super(msg);
-        _queue = queue;
-    }
-
-    public SyncQueueVO getQueue() {
-        return _queue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobDispatcher.java 
b/server/src/com/cloud/async/AsyncJobDispatcher.java
new file mode 100644
index 0000000..80e6254
--- /dev/null
+++ b/server/src/com/cloud/async/AsyncJobDispatcher.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.async;
+
+import com.cloud.utils.component.Adapter;
+
+public interface AsyncJobDispatcher extends Adapter {
+       void RunJob(AsyncJobVO job);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/34cae634/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java 
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 3c6d672..dfd6a01 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -36,17 +36,15 @@ import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
 import org.apache.cloudstack.api.ApiErrorCode;
-import org.apache.cloudstack.api.BaseAsyncCmd;
-import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
 import org.apache.cloudstack.api.response.ExceptionResponse;
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 import org.springframework.stereotype.Component;
 
-import com.cloud.api.ApiDispatcher;
-import com.cloud.api.ApiGsonHelper;
+import com.cloud.api.ApiAsyncJobDispatcher;
 import com.cloud.api.ApiSerializerHelper;
+import com.cloud.api.AsyncCommandQueued;
 import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.cluster.ClusterManagerListener;
@@ -63,7 +61,6 @@ import com.cloud.user.dao.AccountDao;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.DB;
@@ -73,11 +70,7 @@ import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.exception.ExceptionUtil;
 import com.cloud.utils.mgmt.JmxUtil;
 import com.cloud.utils.net.MacAddress;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 
-@Component
-@Local(value={AsyncJobManager.class})
 public class AsyncJobManagerImpl extends ManagerBase implements 
AsyncJobManager, ClusterManagerListener {
     public static final Logger s_logger = 
Logger.getLogger(AsyncJobManagerImpl.class.getName());
     private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3;  
// 3 seconds
@@ -92,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     @Inject private AccountDao _accountDao;
     @Inject private AsyncJobDao _jobDao;
     @Inject private ConfigurationDao _configDao;
+    @Inject private List<AsyncJobDispatcher> _jobDispatchers;
 
     private long _jobExpireSeconds = 86400;                                    
        // 1 day
     private long _jobCancelThresholdSeconds = 3600;            // 1 hour (for 
cancelling the jobs blocking other jobs)
 
-    @Inject private ApiDispatcher _dispatcher;
-
     private final ScheduledExecutorService _heartbeatScheduler =
             Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("AsyncJobMgr-Heartbeat"));
     private ExecutorService _executor;
@@ -377,14 +369,33 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             _executor.submit(runnable);
         }
     }
-
+    
+    private AsyncJobDispatcher getDispatcher(String dispatcherName) {
+       if(dispatcherName == null || dispatcherName.isEmpty())
+               dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName();
+       
+       if(_jobDispatchers != null) {
+               for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
+                       if(dispatcherName.equals(dispatcher.getName()))
+                               return dispatcher;
+               }
+       }
+       return null;
+    }
+    
     private Runnable getExecutorRunnable(final AsyncJobManager mgr, final 
AsyncJobVO job) {
         return new Runnable() {
             @Override
             public void run() {
-                try {
-                    long jobId = 0;
-
+               Transaction txn = null;
+               try {
+                       //
+                       // setup execution environment
+                       //
+                    NDC.push("job-" + job.getId());
+                    
+                       txn = Transaction.open(Transaction.CLOUD_DB);
+                       
                     try {
                         JmxUtil.registerMBean("AsyncJobManager", "Active Job " 
+ job.getId(), new AsyncJobMBeanImpl(job));
                     } catch(Exception e) {
@@ -392,122 +403,58 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
                     }
                     
                     AsyncJobExecutionContext.setCurrentExecutionContext(new 
AsyncJobExecutionContext(job));
-
-                    BaseAsyncCmd cmdObj = null;
-                    Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+                    
+                    // execute the job
+                    if(s_logger.isDebugEnabled()) {
+                        s_logger.debug("Executing " + job.getCmd() + " for 
job-" + job.getId());
+                    }
+                    
+                    AsyncJobDispatcher jobDispatcher = 
getDispatcher(job.getDispatcher());
+                    if(jobDispatcher != null) {
+                       jobDispatcher.RunJob(job);
+                    } else {
+                       s_logger.error("Unable to find job dispatcher, job will 
be cancelled");
+                        completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+                    }
+                    
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Done executing " + job.getCmd() + " 
for job-" + job.getId());
+                    }
+                   
+               } catch (Throwable e) {
+                       s_logger.error("Unexpected exception", e);
+                    completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+               } finally {
+                       // guard final clause as well
                     try {
-                        jobId = job.getId();
-                        NDC.push("job-" + jobId);
-
-                        if(s_logger.isDebugEnabled()) {
-                            s_logger.debug("Executing " + job.getCmd() + " for 
job-" + jobId);
-                        }
-
-                        Class<?> cmdClass = Class.forName(job.getCmd());
-                        cmdObj = (BaseAsyncCmd)cmdClass.newInstance();
-                        cmdObj = ComponentContext.inject(cmdObj);
-                        cmdObj.configure();
-                        cmdObj.setJob(job);
-
-                        Type mapType = new TypeToken<Map<String, String>>() 
{}.getType();
-                        Gson gson = ApiGsonHelper.getBuilder().create();
-                        Map<String, String> params = 
gson.fromJson(job.getCmdInfo(), mapType);
-
-                        // whenever we deserialize, the UserContext needs to 
be updated
-                        String userIdStr = params.get("ctxUserId");
-                        String acctIdStr = params.get("ctxAccountId");
-                        Long userId = null;
-                        Account accountObject = null;
-
-                        if (userIdStr != null) {
-                            userId = Long.parseLong(userIdStr);
-                        }
-
-                        if (acctIdStr != null) {
-                            accountObject = 
_accountDao.findById(Long.parseLong(acctIdStr));
-                        }
-
-                        UserContext.registerContext(userId, accountObject, 
null, false);
-                        try {
-                            // dispatch could ultimately queue the job
-                            _dispatcher.dispatch(cmdObj, params);
-
-                            // serialize this to the async job table
-                            completeAsyncJob(jobId, 
AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponseObject());
-                        } finally {
-                            UserContext.unregisterContext();
-                        }
-
-                        // commands might need to be queued as part of 
synchronization here, so they just have to be re-dispatched from the queue 
mechanism...
-                        if (job.getSyncSource() != null) {
+                       if (job.getSyncSource() != null) {
                             _queueMgr.purgeItem(job.getSyncSource().getId());
                             checkQueue(job.getSyncSource().getQueueId());
                         }
 
-                        if (s_logger.isDebugEnabled()) {
-                            s_logger.debug("Done executing " + job.getCmd() + 
" for job-" + jobId);
-                        }
-
+                       //
+                       // clean execution environment
+                       //
+                        
AsyncJobExecutionContext.setCurrentExecutionContext(null);
+                       
+                       try {
+                               JmxUtil.unregisterMBean("AsyncJobManager", 
"Active Job " + job.getId());
+                       } catch(Exception e) {
+                            s_logger.warn("Unable to unregister job " + 
job.getId() + " to JMX monitoring due to exception " + 
ExceptionUtil.toString(e));
+                       }
+                       
+                           if(txn != null)
+                               txn.close();
+                           
+                           NDC.pop();
                     } catch(Throwable e) {
-                        if (e instanceof AsyncCommandQueued) {
-                            if (s_logger.isDebugEnabled()) {
-                                s_logger.debug("job " + job.getCmd() + " for 
job-" + jobId + " was queued, processing the queue.");
-                            }
-                            
checkQueue(((AsyncCommandQueued)e).getQueue().getId());
-                        } else {
-                            String errorMsg = null;
-                            int errorCode = 
ApiErrorCode.INTERNAL_ERROR.getHttpCode();
-                            if (!(e instanceof ServerApiException)) {
-                                s_logger.error("Unexpected exception while 
executing " + job.getCmd(), e);
-                                errorMsg = e.getMessage();
-                            } else {
-                                ServerApiException sApiEx = 
(ServerApiException)e;
-                                errorMsg = sApiEx.getDescription();
-                                errorCode = 
sApiEx.getErrorCode().getHttpCode();
-                            }
-
-                            ExceptionResponse response = new 
ExceptionResponse();
-                            response.setErrorCode(errorCode);
-                            response.setErrorText(errorMsg);
-                            response.setResponseName((cmdObj == null) ? 
"unknowncommandresponse" : cmdObj.getCommandName());
-
-                            // FIXME:  setting resultCode to 
ApiErrorCode.INTERNAL_ERROR is not right, usually executors have their 
exception handling
-                            //         and we need to preserve that as much as 
possible here
-                            completeAsyncJob(jobId, 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), 
response);
-
-                            // need to clean up any queue that happened as 
part of the dispatching and move on to the next item in the queue
-                            try {
-                                if (job.getSyncSource() != null) {
-                                    
_queueMgr.purgeItem(job.getSyncSource().getId());
-                                    
checkQueue(job.getSyncSource().getQueueId());
-                                }
-                            } catch(Throwable ex) {
-                                s_logger.fatal("Exception on exception, log it 
for record", ex);
-                            }
-                        }
-                    } finally {
-
-                        try {
-                            JmxUtil.unregisterMBean("AsyncJobManager", "Active 
Job " + job.getId());
-                        } catch(Exception e) {
-                            s_logger.warn("Unable to unregister active job " + 
job.getId() + " from JMX monitoring");
-                        }
-
-                        txn.close();
-                        NDC.pop();
+                               s_logger.error("Double exception", e);
                     }
-                } catch (Throwable th) {
-                    try {
-                        s_logger.error("Caught: " + th);
-                    } catch (Throwable th2) {
-                    }
-                } finally {
-                       
AsyncJobExecutionContext.setCurrentExecutionContext(null);
-                }
+               }
             }
         };
     }
-
+    
     private void executeQueueItem(SyncQueueItemVO item, boolean 
fromPreviousSession) {
         AsyncJobVO job = _jobDao.findById(item.getContentId());
         if (job != null) {

Reply via email to