Updated Branches:
  refs/heads/vmsync 34cae6349 -> aa108fcae

Get rid of annoying communicating through exception between API/Job manager, 
decouple job manager from API manager for VMsync usage


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/aa108fca
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/aa108fca
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/aa108fca

Branch: refs/heads/vmsync
Commit: aa108fcaeca796a2649c9d88b14fe49def956dfa
Parents: 34cae63
Author: Kelven Yang <[email protected]>
Authored: Tue Apr 9 18:05:14 2013 -0700
Committer: Kelven Yang <[email protected]>
Committed: Tue Apr 9 18:05:14 2013 -0700

----------------------------------------------------------------------
 client/tomcatconf/applicationContext.xml.in        |    9 ++-
 .../src/com/cloud/api/ApiAsyncJobDispatcher.java   |   38 ++++------
 server/src/com/cloud/api/ApiDispatcher.java        |   54 ++++++++-------
 server/src/com/cloud/api/ApiServer.java            |    4 -
 server/src/com/cloud/api/AsyncCommandQueued.java   |   36 ----------
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |   32 +++------
 setup/db/db/schema-410to420.sql                    |    5 ++
 7 files changed, 69 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in 
b/client/tomcatconf/applicationContext.xml.in
index 5d2fb2c..aca4dbe 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -788,14 +788,19 @@
 
 
   <!-- Async management -->
-  <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher" 
/>
   <bean id="asyncJobDaoImpl" class="com.cloud.async.dao.AsyncJobDaoImpl" />
   <bean id="asyncJobJoinDaoImpl" 
class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
-  <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" />
+  <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl">
+    <property name="defaultDispatcher" value="ApiAsyncJobDispatcher" />
+  </bean>
   <bean id="syncQueueDaoImpl" class="com.cloud.async.dao.SyncQueueDaoImpl" />
   <bean id="syncQueueItemDaoImpl" 
class="com.cloud.async.dao.SyncQueueItemDaoImpl" />
   <bean id="syncQueueManagerImpl" class="com.cloud.async.SyncQueueManagerImpl" 
/>
 
+  <bean id="ApiAsyncJobDispatcher" class="com.cloud.api.ApiAsyncJobDispatcher">
+    <property name="name" value="ApiAsyncJobDispatcher" />
+  </bean>
+
   <!--
     Baremetal components
   -->

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java 
b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
index 98ed5bf..98a7747 100644
--- a/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
+++ b/server/src/com/cloud/api/ApiAsyncJobDispatcher.java
@@ -91,31 +91,25 @@ public class ApiAsyncJobDispatcher extends AdapterBase 
implements AsyncJobDispat
                 UserContext.unregisterContext();
             }
         } catch(Throwable e) {
-            if (e instanceof AsyncCommandQueued) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("job " + job.getCmd() + " for job-" + 
job.getId() + " was queued, processing the queue.");
-                }
+            String errorMsg = null;
+            int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
+            if (!(e instanceof ServerApiException)) {
+                s_logger.error("Unexpected exception while executing " + 
job.getCmd(), e);
+                errorMsg = e.getMessage();
             } else {
-                String errorMsg = null;
-                int errorCode = ApiErrorCode.INTERNAL_ERROR.getHttpCode();
-                if (!(e instanceof ServerApiException)) {
-                    s_logger.error("Unexpected exception while executing " + 
job.getCmd(), e);
-                    errorMsg = e.getMessage();
-                } else {
-                    ServerApiException sApiEx = (ServerApiException)e;
-                    errorMsg = sApiEx.getDescription();
-                    errorCode = sApiEx.getErrorCode().getHttpCode();
-                }
+                ServerApiException sApiEx = (ServerApiException)e;
+                errorMsg = sApiEx.getDescription();
+                errorCode = sApiEx.getErrorCode().getHttpCode();
+            }
 
-                ExceptionResponse response = new ExceptionResponse();
-                response.setErrorCode(errorCode);
-                response.setErrorText(errorMsg);
-                response.setResponseName((cmdObj == null) ? 
"unknowncommandresponse" : cmdObj.getCommandName());
+            ExceptionResponse response = new ExceptionResponse();
+            response.setErrorCode(errorCode);
+            response.setErrorText(errorMsg);
+            response.setResponseName((cmdObj == null) ? 
"unknowncommandresponse" : cmdObj.getCommandName());
 
-                // FIXME:  setting resultCode to ApiErrorCode.INTERNAL_ERROR 
is not right, usually executors have their exception handling
-                //         and we need to preserve that as much as possible 
here
-                _asyncJobMgr.completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), 
response);
-            }
+            // FIXME:  setting resultCode to ApiErrorCode.INTERNAL_ERROR is 
not right, usually executors have their exception handling
+            //         and we need to preserve that as much as possible here
+            _asyncJobMgr.completeAsyncJob(job.getId(), 
AsyncJobResult.STATUS_FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), 
response);
         }
        }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/server/src/com/cloud/api/ApiDispatcher.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiDispatcher.java 
b/server/src/com/cloud/api/ApiDispatcher.java
index 9298360..a859414 100755
--- a/server/src/com/cloud/api/ApiDispatcher.java
+++ b/server/src/com/cloud/api/ApiDispatcher.java
@@ -55,6 +55,7 @@ import 
org.apache.cloudstack.api.command.user.event.ListEventsCmd;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
+import com.cloud.async.AsyncJobExecutionContext;
 import com.cloud.async.AsyncJobManager;
 import com.cloud.dao.EntityManager;
 import com.cloud.exception.InvalidParameterValueException;
@@ -126,35 +127,38 @@ public class ApiDispatcher {
     }
 
     public void dispatch(BaseCmd cmd, Map<String, String> params) throws 
Exception {
-            processParameters(cmd, params);
-            UserContext ctx = UserContext.current();
-            ctx.setAccountId(cmd.getEntityOwnerId());
-            
-            if (cmd instanceof BaseAsyncCmd) {
-
-                BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
-                String startEventId = params.get("ctxStartEventId");
-                ctx.setStartEventId(Long.valueOf(startEventId));
-
-                // Synchronise job on the object if needed
-                if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != 
null && asyncCmd.getSyncObjType() != null) {
-                    Long queueSizeLimit = null;
-                    if (asyncCmd.getSyncObjType() != null && 
asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject))
 {
-                        queueSizeLimit = _createSnapshotQueueSizeLimit;
-                    } else {
-                        queueSizeLimit = 1L;
-                    }
+        processParameters(cmd, params);
+        UserContext ctx = UserContext.current();
+        ctx.setAccountId(cmd.getEntityOwnerId());
+        
+        if (cmd instanceof BaseAsyncCmd) {
+
+            BaseAsyncCmd asyncCmd = (BaseAsyncCmd) cmd;
+            String startEventId = params.get("ctxStartEventId");
+            ctx.setStartEventId(Long.valueOf(startEventId));
+
+            // Synchronise job on the object if needed
+            if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null 
&& asyncCmd.getSyncObjType() != null) {
+                Long queueSizeLimit = null;
+                if (asyncCmd.getSyncObjType() != null && 
asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject))
 {
+                    queueSizeLimit = _createSnapshotQueueSizeLimit;
+                } else {
+                    queueSizeLimit = 1L;
+                }
 
-                    if (queueSizeLimit != null) {
-                    _asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), 
asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
-                    } else {
-                        s_logger.trace("The queue size is unlimited, skipping 
the synchronizing");
-                    }
+                if (queueSizeLimit != null) {
+                       
if(AsyncJobExecutionContext.getCurrentExecutionContext() == null) {
+                               // if we are not within async-execution 
context, enqueue the command
+                               
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), 
asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
+                               return;
+                       }
+                } else {
+                    s_logger.trace("The queue size is unlimited, skipping the 
synchronizing");
                 }
             }
+        }
 
-            cmd.execute();
-
+        cmd.execute();
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java 
b/server/src/com/cloud/api/ApiServer.java
index d4d88b3..42c5a73 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -361,10 +361,6 @@ public class ApiServer implements HttpRequestHandler, 
ApiServerService {
             }
             throw new 
ServerApiException(ApiErrorCode.RESOURCE_UNAVAILABLE_ERROR, errorMsg, ex);
         }
-        catch (AsyncCommandQueued ex){
-            s_logger.error("unhandled exception executing api command: " + 
((command == null) ? "null" : command[0]), ex);
-            throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, 
"Internal server error, unable to execute request.");
-        }
         catch (ServerApiException ex){
             s_logger.info(ex.getDescription());
             throw ex;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/server/src/com/cloud/api/AsyncCommandQueued.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/AsyncCommandQueued.java 
b/server/src/com/cloud/api/AsyncCommandQueued.java
deleted file mode 100644
index ecd38c8..0000000
--- a/server/src/com/cloud/api/AsyncCommandQueued.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.api;
-
-import com.cloud.async.SyncQueueVO;
-import com.cloud.utils.SerialVersionUID;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public class AsyncCommandQueued extends CloudRuntimeException {
-    private static final long serialVersionUID = 
SerialVersionUID.AsyncCommandQueued;
-
-    private SyncQueueVO _queue = null;
-
-    public AsyncCommandQueued(SyncQueueVO queue, String msg) {
-        super(msg);
-        _queue = queue;
-    }
-
-    public SyncQueueVO getQueue() {
-        return _queue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java 
b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index dfd6a01..586f40e 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -19,7 +19,6 @@ package com.cloud.async;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.lang.reflect.Type;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +30,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import javax.ejb.Local;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
@@ -40,11 +38,8 @@ import 
org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
 import org.apache.cloudstack.api.response.ExceptionResponse;
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
-import org.springframework.stereotype.Component;
 
-import com.cloud.api.ApiAsyncJobDispatcher;
 import com.cloud.api.ApiSerializerHelper;
-import com.cloud.api.AsyncCommandQueued;
 import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.cluster.ClusterManagerListener;
@@ -87,7 +82,16 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     @Inject private ConfigurationDao _configDao;
     @Inject private List<AsyncJobDispatcher> _jobDispatchers;
 
-    private long _jobExpireSeconds = 86400;                                    
        // 1 day
+    // property
+    private String defaultDispatcher;
+    public String getDefaultDispatcher() {
+               return defaultDispatcher;
+       }
+       public void setDefaultDispatcher(String defaultDispatcher) {
+               this.defaultDispatcher = defaultDispatcher;
+       }
+
+       private long _jobExpireSeconds = 86400;                                 
        // 1 day
     private long _jobCancelThresholdSeconds = 3600;            // 1 hour (for 
cancelling the jobs blocking other jobs)
 
     private final ScheduledExecutorService _heartbeatScheduler =
@@ -239,15 +243,6 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
 
     @Override
     public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long 
syncObjId, long queueSizeLimit) {
-        // This method is re-entrant.  If an API developer wants to 
synchronized on an object, e.g. the router,
-        // when executing business logic, they will call this method (actually 
a method in BaseAsyncCmd that calls this).
-        // This method will get called every time their business logic 
executes.  The first time it exectues for a job
-        // there will be no sync source, but on subsequent execution there 
will be a sync souce.  If this is the first
-        // time the job executes we queue the job, otherwise we just return so 
that the business logic can execute.
-        if (job.getSyncSource() != null) {
-            return;
-        }
-
         if(s_logger.isDebugEnabled()) {
             s_logger.debug("Sync job-" + job.getId() + " execution on object " 
+ syncObjType + "." + syncObjId);
         }
@@ -270,11 +265,8 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
             }
         }
 
-        if (queue == null) {
+        if (queue == null)
             throw new CloudRuntimeException("Unable to insert queue item into 
database, DB is full?");
-        } else {
-            throw new AsyncCommandQueued(queue, "job-" + job.getId() + " 
queued");
-        }
     }
 
     @Override
@@ -372,7 +364,7 @@ public class AsyncJobManagerImpl extends ManagerBase 
implements AsyncJobManager,
     
     private AsyncJobDispatcher getDispatcher(String dispatcherName) {
        if(dispatcherName == null || dispatcherName.isEmpty())
-               dispatcherName = ApiAsyncJobDispatcher.class.getSimpleName();
+               dispatcherName = this.defaultDispatcher;
        
        if(_jobDispatchers != null) {
                for(AsyncJobDispatcher dispatcher : _jobDispatchers) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aa108fca/setup/db/db/schema-410to420.sql
----------------------------------------------------------------------
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index ab9df05..fecdc0f 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -405,3 +405,8 @@ INSERT INTO `cloud`.`vm_template` (id, unique_name, name, 
public, created, type,
      VALUES (10, 'routing-10', 'SystemVM Template (LXC)', 0, now(), 'SYSTEM', 
0, 64, 1, 
'http://download.cloud.com/templates/acton/acton-systemvm-02062012.qcow2.bz2', 
'2755de1f9ef2ce4d6f2bee2efbb4da92', 0, 'SystemVM Template (LXC)', 'QCOW2', 15, 
0, 1, 'LXC');
 
 -- END: support for LXC
+
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `session_key`;
+ALTER TABLE `cloud`.`async_job` DROP COLUMN `job_cmd_originator`;
+ALTER TABLE `cloud`.`async_job` ADD COLUMN `job_dispatcher` VARCHAR(64);
+

Reply via email to