Repository: airavata
Updated Branches:
  refs/heads/master 1d83a48b7 -> e5131ad87


fixing the thread creation in airavata-api


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e5131ad8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e5131ad8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e5131ad8

Branch: refs/heads/master
Commit: e5131ad8780045412660b5f4a9fa59848998e1fc
Parents: 1d83a48
Author: lahiru <[email protected]>
Authored: Fri Oct 24 02:55:48 2014 -0400
Committer: lahiru <[email protected]>
Committed: Fri Oct 24 02:55:48 2014 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   | 129 +++++++++++--------
 .../main/resources/airavata-server.properties   |   1 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |   4 +-
 3 files changed, 78 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 76100ff..5753e3e 100644
--- 
a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ 
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.aiaravata.application.catalog.data.resources.*;
 import 
org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConversion;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.api.server.util.AiravataServerThreadPoolExecutor;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -1165,7 +1166,7 @@ public class AiravataServerHandler implements 
Airavata.Iface {
                 logger.debugId(airavataExperimentId, "Launching single 
application experiment {}.", airavataExperimentId);
                 final OrchestratorService.Client orchestratorClient = 
getOrchestratorClient();
                 if (orchestratorClient.validateExperiment(expID)) {
-                   launchSingleAppExperiment(expID, token, orchestratorClient);
+                   
AiravataServerThreadPoolExecutor.getFixedThreadPool().execute(new 
SingleAppExperimentRunner(expID, token, orchestratorClient));
                 } else {
                     logger.errorId(airavataExperimentId, "Experiment 
validation failed. Please check the configurations.");
                     throw new InvalidRequestException("Experiment Validation 
Failed, please check the configuration");
@@ -1199,66 +1200,86 @@ public class AiravataServerHandler implements 
Airavata.Iface {
             logger.errorId(experimentId, "Error while launching experiment.", 
e);
         }
     }
-    
-    private boolean launchSingleAppExperiment(String experimentId, String 
airavataCredStoreToken, OrchestratorService.Client orchestratorClient) throws 
TException {
-        Experiment experiment = null;
-        try {
-            List<String> ids = 
registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, 
WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-            for (String workflowNodeId : ids) {
+
+    private class SingleAppExperimentRunner implements Runnable {
+
+        String experimentId;
+        String airavataCredStoreToken;
+        Client client;
+        public SingleAppExperimentRunner(String experimentId,String 
airavataCredStoreToken,Client client){
+            this.experimentId = experimentId;
+            this.airavataCredStoreToken = airavataCredStoreToken;
+            this.client = client;
+        }
+        @Override
+        public void run() {
+            try {
+                launchSingleAppExperiment();
+            } catch (TException e) {
+                e.printStackTrace();
+            }
+        }
+
+        private boolean launchSingleAppExperiment() throws TException {
+            Experiment experiment = null;
+            try {
+                List<String> ids = 
registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, 
WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+                for (String workflowNodeId : ids) {
 //                WorkflowNodeDetails workflowNodeDetail = 
(WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, 
workflowNodeId);
-                List<Object> taskDetailList = 
registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, 
workflowNodeId);
-                for (Object o : taskDetailList) {
-                    TaskDetails taskData = (TaskDetails) o;
-                    //iterate through all the generated tasks and performs the 
job submisssion+monitoring
-                    experiment = (Experiment) 
registry.get(RegistryModelType.EXPERIMENT, experimentId);
-                    if (experiment == null) {
-                        logger.errorId(experimentId, "Error retrieving the 
Experiment by the given experimentID: {}", experimentId);
-                        return false;
-                    }
-                    ExperimentStatus status = new ExperimentStatus();
-                    status.setExperimentState(ExperimentState.LAUNCHED);
-                    
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-                    experiment.setExperimentStatus(status);
-                    registry.update(RegistryModelType.EXPERIMENT_STATUS, 
status, experimentId);
-                    if (ServerSettings.isRabbitMqPublishEnabled()){
-                        String gatewayId = 
ServerSettings.getDefaultUserGateway();
-                        ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
-                                experimentId,
-                                gatewayId);
-                        String messageId = AiravataUtils.getId("EXPERIMENT");
-                        MessageContext messageContext = new 
MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId);
-                        
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-                        publisher.publish(messageContext);
+                    List<Object> taskDetailList = 
registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, 
workflowNodeId);
+                    for (Object o : taskDetailList) {
+                        TaskDetails taskData = (TaskDetails) o;
+                        //iterate through all the generated tasks and performs 
the job submisssion+monitoring
+                        experiment = (Experiment) 
registry.get(RegistryModelType.EXPERIMENT, experimentId);
+                        if (experiment == null) {
+                            logger.errorId(experimentId, "Error retrieving the 
Experiment by the given experimentID: {}", experimentId);
+                            return false;
+                        }
+                        ExperimentStatus status = new ExperimentStatus();
+                        status.setExperimentState(ExperimentState.LAUNCHED);
+                        
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                        experiment.setExperimentStatus(status);
+                        registry.update(RegistryModelType.EXPERIMENT_STATUS, 
status, experimentId);
+                        if (ServerSettings.isRabbitMqPublishEnabled()) {
+                            String gatewayId = 
ServerSettings.getDefaultUserGateway();
+                            ExperimentStatusChangeEvent event = new 
ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
+                                    experimentId,
+                                    gatewayId);
+                            String messageId = 
AiravataUtils.getId("EXPERIMENT");
+                            MessageContext messageContext = new 
MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
+                            
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                            publisher.publish(messageContext);
+                        }
+                        registry.update(RegistryModelType.TASK_DETAIL, 
taskData, taskData.getTaskID());
+                        //launching the experiment
+                        client.launchTask(taskData.getTaskID(), 
airavataCredStoreToken);
                     }
-                    registry.update(RegistryModelType.TASK_DETAIL, taskData, 
taskData.getTaskID());
-                    //launching the experiment
-                    orchestratorClient.launchTask(taskData.getTaskID(), 
airavataCredStoreToken);
                 }
-            }
 
-        } catch (Exception e) {
-            // Here we really do not have to do much because only potential 
failure can happen
-            // is in gfac, if there are errors in gfac, it will handle the 
experiment/task/job statuses
-            // We might get failures in registry access before submitting the 
jobs to gfac, in that case we
-            // leave the status of these as created.
-            ExperimentStatus status = new ExperimentStatus();
-            status.setExperimentState(ExperimentState.FAILED);
-            
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-            experiment.setExperimentStatus(status);
-            try {
-                registry.update(RegistryModelType.EXPERIMENT_STATUS, status, 
experimentId);
-            } catch (RegistryException e1) {
-                logger.errorId(experimentId, "Error while updating experiment 
status to " + status.toString(), e);
+            } catch (Exception e) {
+                // Here we really do not have to do much because only 
potential failure can happen
+                // is in gfac, if there are errors in gfac, it will handle the 
experiment/task/job statuses
+                // We might get failures in registry access before submitting 
the jobs to gfac, in that case we
+                // leave the status of these as created.
+                ExperimentStatus status = new ExperimentStatus();
+                status.setExperimentState(ExperimentState.FAILED);
+                
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                experiment.setExperimentStatus(status);
+                try {
+                    registry.update(RegistryModelType.EXPERIMENT_STATUS, 
status, experimentId);
+                } catch (RegistryException e1) {
+                    logger.errorId(experimentId, "Error while updating 
experiment status to " + status.toString(), e);
+                    throw new TException(e);
+                }
+                logger.errorId(experimentId, "Error while updating task 
status, hence updated experiment status to " + status.toString(), e);
                 throw new TException(e);
-            }
-            logger.errorId(experimentId, "Error while updating task status, 
hence updated experiment status to " + status.toString(), e);
-            throw new TException(e);
-        }finally {
-            orchestratorClient.getOutputProtocol().getTransport().close();
-            orchestratorClient.getInputProtocol().getTransport().close();
+            } finally {
+                client.getOutputProtocol().getTransport().close();
+                client.getInputProtocol().getTransport().close();
 
+            }
+            return true;
         }
-        return true;
     }
     
        private OrchestratorService.Client getOrchestratorClient() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git 
a/modules/configuration/server/src/main/resources/airavata-server.properties 
b/modules/configuration/server/src/main/resources/airavata-server.properties
index 978369e..edb60e2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -113,6 +113,7 @@ [email protected]
 #  to interact with Computational Resources.
 #
 gfac.thread.pool.size=50
+airavata.server.thread.pool.size=50
 gfac=org.apache.airavata.gfac.server.GfacServer
 myproxy.server=myproxy.teragrid.org
 myproxy.username=ogce

http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index e165bfd..d3c3df8 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -295,8 +295,8 @@ public class HPCPullMonitor extends PullMonitor {
             for (String jobName: keys) {
                 MonitorID completedJob = completedJobs.get(jobName);
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
-                    
gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
-//                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new 
OutHandlerWorker(gfac, completedJob, publisher));
+//                    
gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
+                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new 
OutHandlerWorker(gfac, completedJob, publisher));
                 if (zk == null) {
                     zk = completedJob.getJobExecutionContext().getZk();
                 }

Reply via email to