http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..9f369b1
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Map;
+
+//import org.apache.airavata.schemas.gfac.GsisshHostType;
+
+public class GSISSHProvider extends AbstractProvider {
+    private static final Logger log = 
LoggerFactory.getLogger(GSISSHProvider.class);
+
+    public void initProperties(Map<String, String> properties) throws 
GFacProviderException, GFacException {
+
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws 
GFacProviderException, GFacException {
+        super.initialize(jobExecutionContext);
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage());
+            throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+        } catch (GFacException e) {
+            throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+        }
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws 
GFacProviderException, GFacException {
+        log.info("Invoking GSISSH Provider Invoke ...");
+        StringBuffer data = new StringBuffer();
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+        ComputeResourceDescription computeResourceDescription = 
jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
+        ApplicationDeploymentDescription appDeployDesc = 
jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription();
+        JobDetails jobDetails = new JobDetails();
+        Cluster cluster = null;
+
+        try {
+            if 
(jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != 
null) {
+                cluster = ((GSISecurityContext) 
jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
+            }
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set 
properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            // This installed path is a mandetory field, because this could 
change based on the computing resource
+            JobDescriptor jobDescriptor = 
GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
+            jobDetails.setJobName(jobDescriptor.getJobName());
+
+            log.info(jobDescriptor.toXML());
+            data.append("jobDesc=").append(jobDescriptor.toXML());
+            jobDetails.setJobDescription(jobDescriptor.toXML());
+            String jobID = cluster.submitBatchJob(jobDescriptor);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if (jobID == null) {
+                jobDetails.setJobID("none");
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
+            } else {
+                jobDetails.setJobID(jobID.split("\\.")[0]);
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.SUBMITTED);
+            }
+            data.append(",jobId=").append(jobDetails.getJobID());
+
+            // Now job has submitted to the resource, its up to the Provider 
to parse the information to daemon handler
+            // to perform monitoring, daemon handlers can be accessed from 
anywhere
+            monitor(jobExecutionContext);
+            // we know this host is type GsiSSHHostType
+        } catch (Exception e) {
+                   String error = "Error submitting the job to host " + 
computeResourceDescription.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
+            StringWriter errors = new StringWriter();
+            e.printStackTrace(new PrintWriter(errors));
+            GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        } finally {
+            log.info("Saving data for future recovery: ");
+            log.info(data.toString());
+            GFacUtils.saveHandlerData(jobExecutionContext, data, 
this.getClass().getName());
+        } 
+          
+    }
+
+    public void removeFromMonitorHandlers(JobExecutionContext 
jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws 
GFacHandlerException {
+/*        List<ThreadedHandler> daemonHandlers = 
BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if 
("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName()))
 {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == 
MonitorMode.POLL_JOB_MANAGER) {
+                    jobExecutionContext.setProperty("cancel","true");
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push 
monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if 
("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName()))
 {
+                pushMonitorHandler = threadedHandler;
+                if ( monitorMode == null || monitorMode == 
MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push 
monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && 
ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode()))
 {
+            log.error("No Daemon handler is configured in gfac-config.xml, 
either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler 
will not be invoked");
+        }*/
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws 
GFacProviderException, GFacException {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+    }
+
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws 
GFacProviderException,GFacException {
+        //To change body of implemented methods use File | Settings | File 
Templates.
+        log.info("canceling the job status in GSISSHProvider!!!!!");
+        JobDetails jobDetails = jobExecutionContext.getJobDetails();
+        String hostName = jobExecutionContext.getHostName();
+        try {
+            Cluster cluster = null;
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+            }
+            cluster = ((GSISecurityContext) 
jobExecutionContext.getSecurityContext(hostName)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set 
properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            // This installed path is a mandetory field, because this could 
change based on the computing resource
+            if(jobDetails == null) {
+                log.error("There is not JobDetails so cancelations cannot 
perform !!!");
+                return false;
+            }
+            if (jobDetails.getJobID() != null) {
+                // if this operation success without any exceptions, we can 
assume cancel operation succeeded.
+                cluster.cancelJob(jobDetails.getJobID());
+            } else {
+                log.error("No Job Id is set, so cannot perform the cancel 
operation !!!");
+                return false;
+            }
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.CANCELED);
+            return true;
+            // we know this host is type GsiSSHHostType
+        } catch (SSHApiException e) {
+            String error = "Error submitting the job to host " + 
jobExecutionContext.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext,  
e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        } catch (Exception e) {
+            String error = "Error submitting the job to host " + 
jobExecutionContext.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext,  
e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        }
+    }
+
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacProviderException,GFacException {
+        // have to implement the logic to recover a gfac failure
+        log.info("Invoking Recovering for the Experiment: " + 
jobExecutionContext.getExperimentID());
+        ComputeResourceDescription computeResourceDescription = 
jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
+        String hostName = jobExecutionContext.getHostName();
+        String jobId = "";
+        String jobDesc = "";
+        try {
+            String pluginData = GFacUtils.getHandlerData(jobExecutionContext, 
this.getClass().getName());
+            String[] split = pluginData.split(",");
+            if (split.length < 2) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    log.error("Error while  recovering provider", e);
+                    throw new GFacProviderException("Error recovering 
provider", e);
+                }
+                return;
+            }
+            jobDesc = split[0].substring(7);
+            jobId = split[1].substring(6);
+
+            log.info("Following data have recovered: ");
+            log.info("Job Description: " + jobDesc);
+            log.info("Job Id: " + jobId);
+            if (jobId == null || "none".equals(jobId) ||
+                    "".equals(jobId)) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    log.error("Error while  recovering provider", e);
+                    throw new GFacProviderException("Error recovering 
provider", e);
+                }
+                return;
+            }
+        } catch (Exception e) {
+            log.error("Error while  recovering provider", e);
+        }
+        try {
+            // Now we are we have enough data to recover
+            JobDetails jobDetails = new JobDetails();
+            jobDetails.setJobDescription(jobDesc);
+            jobDetails.setJobID(jobId);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                try {
+                    GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+            monitor(jobExecutionContext);
+        } catch (Exception e) {
+            log.error("Error while recover the job", e);
+            throw new GFacProviderException("Error delegating already ran job 
to Monitoring", e);
+        }
+    }
+
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws 
GFacProviderException, GFacException {
+        String jobSubmissionInterfaceId = 
jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+        SSHJobSubmission sshJobSubmission = null;
+        try {
+            sshJobSubmission = 
jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+        } catch (AppCatalogException e) {
+            throw new GFacException("Error while reading compute resource", e);
+        }
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH) {
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            if (monitorMode != null && monitorMode == 
MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                try {
+                    EmailBasedMonitor emailBasedMonitor = 
EmailMonitorFactory.getEmailBasedMonitor(
+                            
sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                } catch (AiravataException e) {
+                    throw new GFacHandlerException("Error while activating 
email job monitoring ", e);
+                }
+                return;
+            }
+        }
+/*
+        // if email monitor is not activeated or not configure we use pull or 
push monitor
+        List<ThreadedHandler> daemonHandlers = 
BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        String jobID = jobExecutionContext.getJobDetails().getJobID();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if 
("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName()))
 {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == 
MonitorMode.POLL_JOB_MANAGER) {
+                    log.info("Job is launched successfully now parsing it to 
monitoring in pull mode, JobID Returned:  " + jobID);
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push 
monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if 
("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName()))
 {
+                pushMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == 
MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    log.info("Job is launched successfully now parsing it to 
monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push 
monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && 
ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode()))
 {
+            log.error("No Daemon handler is configured in gfac-config.xml, 
either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler 
will not be invoked");
+
+        }*/
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
new file mode 100644
index 0000000..85e9e29
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.AbstractSecurityContext;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles GRID related security.
+ */
+public class GSISecurityContext extends AbstractSecurityContext {
+
+    protected static final Logger log = 
LoggerFactory.getLogger(GSISecurityContext.class);
+    /*
+     * context name
+     */
+
+    private Cluster pbsCluster = null;
+
+
+    public GSISecurityContext(CredentialReader credentialReader, RequestData 
requestData, Cluster pbsCluster) {
+        super(credentialReader, requestData);
+        this.pbsCluster = pbsCluster;
+    }
+
+
+    public GSISecurityContext(CredentialReader credentialReader, RequestData 
requestData) {
+        super(credentialReader, requestData);
+    }
+
+
+    public GSISecurityContext(Cluster pbsCluster) {
+        this.setPbsCluster(pbsCluster);
+    }
+
+
+
+    public Cluster getPbsCluster() {
+        return pbsCluster;
+    }
+
+    public void setPbsCluster(Cluster pbsCluster) {
+        this.pbsCluster = pbsCluster;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
new file mode 100644
index 0000000..a3e0241
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import 
org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+
+public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
+    protected static final Logger log = 
LoggerFactory.getLogger(TokenizedMyProxyAuthInfo.class);
+
+    public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+
+    private GSSCredential gssCredentials = null;
+
+
+    private CredentialReader credentialReader;
+
+    private RequestData requestData;
+
+    public static final String X509_CERT_DIR = "X509_CERT_DIR";
+
+
+    static {
+        Security.addProvider(new GlobusProvider());
+        try {
+            setUpTrustedCertificatePath();
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getLocalizedMessage(), e);
+        }
+    }
+
+    public static void setUpTrustedCertificatePath(String 
trustedCertificatePath) {
+
+        File file = new File(trustedCertificatePath);
+
+        if (!file.exists() || !file.canRead()) {
+            File f = new File(".");
+            log.info("Current directory " + f.getAbsolutePath());
+            throw new RuntimeException("Cannot read trusted certificate path " 
+ trustedCertificatePath);
+        } else {
+            System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, 
file.getAbsolutePath());
+        }
+    }
+
+    private static void setUpTrustedCertificatePath() throws 
ApplicationSettingsException {
+
+        String trustedCertificatePath = 
ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+        setUpTrustedCertificatePath(trustedCertificatePath);
+    }
+
+    public TokenizedMyProxyAuthInfo(CredentialReader credentialReader, 
RequestData requestData) {
+        this.credentialReader = credentialReader;
+        this.requestData = requestData;
+        try {
+            properties.setProperty(X509_CERT_DIR, 
ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+        } catch (ApplicationSettingsException e) {
+            log.error("Error while  reading server properties", e);
+        };
+    }
+
+    public TokenizedMyProxyAuthInfo(RequestData requestData) {
+           this.requestData = requestData;
+           try {
+               properties.setProperty(X509_CERT_DIR, 
ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+           } catch (ApplicationSettingsException e) {
+               log.error("Error while  reading server properties", e);
+           };
+       }
+
+    public GSSCredential getCredentials() throws SecurityException {
+
+        if (gssCredentials == null) {
+
+            try {
+                gssCredentials = getCredentialsFromStore();
+            } catch (Exception e) {
+                log.error("An exception occurred while retrieving credentials 
from the credential store. " +
+                        "Will continue with my proxy user name and password. 
Provided TokenId:" + requestData.getTokenId(), e);
+            }
+
+            if (gssCredentials == null) {
+                System.out.println("Authenticating with provided token failed, 
so falling back to authenticate with defaultCredentials");
+                try {
+                    gssCredentials = getDefaultCredentials();
+                } catch (Exception e) {
+                    throw new SecurityException("Error retrieving my proxy 
using username password");
+                }
+            }
+            // if still null, throw an exception
+            if (gssCredentials == null) {
+                throw new SecurityException("Unable to retrieve my proxy 
credentials to continue operation.");
+            }
+        } else {
+            try {
+                if (gssCredentials.getRemainingLifetime() < 
CREDENTIAL_RENEWING_THRESH_HOLD) {
+                    try {
+                        return renewCredentials();
+                    } catch (Exception e) {
+                        throw new SecurityException("Error renewing 
credentials", e);
+                    }
+                }
+            } catch (GSSException e) {
+                throw new SecurityException("Unable to retrieve remaining life 
time from credentials.", e);
+            }
+        }
+
+        return gssCredentials;
+    }
+
+
+    /**
+     * Reads the credentials from credential store.
+     *
+     * @return If token is found in the credential store, will return a valid 
credential. Else returns null.
+     * @throws Exception If an error occurred while retrieving credentials.
+     */
+    public GSSCredential getCredentialsFromStore() throws Exception {
+
+        if (getCredentialReader() == null) {
+               credentialReader = GFacUtils.getCredentialReader();
+               if(credentialReader == null){
+                       return null;
+               }
+        }
+
+        Credential credential = 
getCredentialReader().getCredential(getRequestData().getGatewayId(),
+                getRequestData().getTokenId());
+
+        if (credential != null) {
+            if (credential instanceof CertificateCredential) {
+
+                log.info("Successfully found credentials for token id - " + 
getRequestData().getTokenId() +
+                        " gateway id - " + getRequestData().getGatewayId());
+
+                CertificateCredential certificateCredential = 
(CertificateCredential) credential;
+
+                X509Certificate[] certificates = 
certificateCredential.getCertificates();
+                X509Credential newCredential = new 
X509Credential(certificateCredential.getPrivateKey(), certificates);
+
+                GlobusGSSCredentialImpl cred = new 
GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT);
+                
System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE));
+                return cred;
+                //return new GlobusGSSCredentialImpl(newCredential,
+                //        GSSCredential.INITIATE_AND_ACCEPT);
+            } else {
+                log.info("Credential type is not CertificateCredential. Cannot 
create mapping globus credentials. " +
+                        "Credential type - " + 
credential.getClass().getName());
+            }
+        } else {
+            log.info("Could not find credentials for token - " + 
getRequestData().getTokenId() + " and "
+                    + "gateway id - " + getRequestData().getGatewayId());
+        }
+
+        return null;
+    }
+
+    /**
+     * Renew GSSCredentials.
+     * Before executing we need to add current host as a trusted renewer. Note 
to renew credentials
+     * we dont need user name and password.
+     * To do that execute following command
+     * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER 
NAME>
+     * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3
+     * Enter MyProxy pass phrase:
+     * A credential has been received for user us3 in /tmp/x509up_u501.
+     * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l 
ogce -s myproxy.teragrid.org
+     *
+     * @return Renewed credentials.
+     * @throws org.apache.airavata.gfac.GFacException                          
  If an error occurred while renewing credentials.
+     * @throws 
org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential renewCredentialsAsATrustedHost() throws 
GFacException, ApplicationSettingsException {
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), 
getRequestData().getMyProxyPort());
+        GetParams getParams = new GetParams();
+        getParams.setAuthzCreds(gssCredentials);
+        getParams.setUserName(getRequestData().getMyProxyUserName());
+        getParams.setLifetime(getRequestData().getMyProxyLifeTime());
+        try {
+            return myproxy.get(gssCredentials, getParams);
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while renewing security 
credentials.", e);
+        }
+    }
+
+
+    /**
+     * Gets the default proxy certificate.
+     *
+     * @return Default my proxy credentials.
+     * @throws org.apache.airavata.gfac.GFacException                          
  If an error occurred while retrieving credentials.
+     * @throws 
org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential getDefaultCredentials() throws GFacException, 
ApplicationSettingsException {
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), 
getRequestData().getMyProxyPort());
+        try {
+            return myproxy.get(getRequestData().getMyProxyUserName(), 
getRequestData().getMyProxyPassword(),
+                    getRequestData().getMyProxyLifeTime());
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while retrieving 
default security credentials.", e);
+        }
+    }
+
+
+    /**
+     * Renews credentials. First try to renew credentials as a trusted 
renewer. If that failed
+     * use user name and password to renew credentials.
+     *
+     * @return Renewed credentials.
+     * @throws org.apache.airavata.gfac.GFacException                          
  If an error occurred while renewing credentials.
+     * @throws 
org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential renewCredentials() throws GFacException, 
ApplicationSettingsException {
+
+        // First try to renew credentials as a trusted renewer
+        try {
+            gssCredentials = renewCredentialsAsATrustedHost();
+        } catch (Exception e) {
+            log.warn("Renewing credentials as a trusted renewer failed", e);
+            gssCredentials = getDefaultCredentials();
+        }
+
+        return gssCredentials;
+    }
+
+    /**
+     * Gets a new proxy certificate given current credentials.
+     *
+     * @return The short lived GSSCredentials
+     * @throws org.apache.airavata.gfac.GFacException                          
  If an error is occurred while retrieving credentials.
+     * @throws 
org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential getProxyCredentials() throws GFacException, 
ApplicationSettingsException {
+
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), 
getRequestData().getMyProxyPort());
+        try {
+            return myproxy.get(gssCredentials, 
getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+                    getRequestData().getMyProxyLifeTime());
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while renewing security 
credentials using user/password.", e);
+        }
+    }
+
+    public void setGssCredentials(GSSCredential gssCredentials) {
+        this.gssCredentials = gssCredentials;
+    }
+
+    public CredentialReader getCredentialReader() {
+        return credentialReader;
+    }
+
+    public void setCredentialReader(CredentialReader credentialReader) {
+        this.credentialReader = credentialReader;
+    }
+
+    public RequestData getRequestData() {
+        return requestData;
+    }
+
+    public void setRequestData(RequestData requestData) {
+        this.requestData = requestData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
new file mode 100644
index 0000000..c3978b1
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -0,0 +1,367 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.util;
+
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import 
org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.security.TokenizedMyProxyAuthInfo;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gfac.ssh.impl.GSISSHAbstractCluster;
+import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+import org.apache.airavata.gfac.ssh.util.CommonUtils;
+import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import 
org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import 
org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+
+public class GFACGSISSHUtils {
+    private final static Logger logger = 
LoggerFactory.getLogger(GFACGSISSHUtils.class);
+
+    public static final String PBS_JOB_MANAGER = "pbs";
+    public static final String SLURM_JOB_MANAGER = "slurm";
+    public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+    public static final String LSF_JOB_MANAGER = "lsf";
+
+    public static int maxClusterCount = 5;
+    public static Map<String, List<Cluster>> clusters = new HashMap<String, 
List<Cluster>>();
+
+    public static void addSecurityContext(JobExecutionContext 
jobExecutionContext) throws GFacException, ApplicationSettingsException {
+        JobSubmissionInterface jobSubmissionInterface = 
jobExecutionContext.getPreferredJobSubmissionInterface();
+        JobSubmissionProtocol jobProtocol = 
jobSubmissionInterface.getJobSubmissionProtocol();
+        try {
+            AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+            SSHJobSubmission sshJobSubmission = 
appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+            if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == 
JobSubmissionProtocol.UNICORE
+                    || jobProtocol == JobSubmissionProtocol.CLOUD || 
jobProtocol == JobSubmissionProtocol.LOCAL) {
+                logger.error("This is a wrong method to invoke to non ssh host 
types,please check your gfac-config.xml");
+            } else if (jobProtocol == JobSubmissionProtocol.SSH && 
sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+                String credentialStoreToken = 
jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+                RequestData requestData = new 
RequestData(jobExecutionContext.getGatewayID());
+                requestData.setTokenId(credentialStoreToken);
+                PBSCluster pbsCluster = null;
+                GSISecurityContext context = null;
+
+                TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new 
TokenizedMyProxyAuthInfo(requestData);
+                CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
+                if (credentialReader != null) {
+                    CertificateCredential credential = null;
+                    try {
+                        credential = (CertificateCredential) 
credentialReader.getCredential(jobExecutionContext.getGatewayID(), 
credentialStoreToken);
+                        
requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
+                    } catch (Exception e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
+                }
+
+                String key = requestData.getMyProxyUserName() + 
jobExecutionContext.getHostName()+
+                        sshJobSubmission.getSshPort();
+                boolean recreate = false;
+                synchronized (clusters) {
+                    if (clusters.containsKey(key) && clusters.get(key).size() 
< maxClusterCount) {
+                        recreate = true;
+                    } else if (clusters.containsKey(key)) {
+                        int i = new Random().nextInt(Integer.MAX_VALUE) % 
maxClusterCount;
+                        if 
(clusters.get(key).get(i).getSession().isConnected()) {
+                            pbsCluster = (PBSCluster) clusters.get(key).get(i);
+                        } else {
+                            clusters.get(key).remove(i);
+                            recreate = true;
+                        }
+                        if (!recreate) {
+                            try {
+                                pbsCluster.listDirectory("~/"); // its hard to 
trust isConnected method, so we try to connect if it works we are good,else we 
recreate
+                            } catch (Exception e) {
+                                clusters.get(key).remove(i);
+                                logger.info("Connection found the connection 
map is expired, so we create from the scratch");
+                                maxClusterCount++;
+                                recreate = true; // we make the pbsCluster to 
create again if there is any exception druing connection
+                            }
+                            logger.info("Re-using the same connection used 
with the connection string:" + key);
+                            context = new 
GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, 
pbsCluster);
+                        }
+                    } else {
+                        recreate = true;
+                    }
+
+                    if (recreate) {
+                        ServerInfo serverInfo = new 
ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
+                                sshJobSubmission.getSshPort());
+
+                        JobManagerConfiguration jConfig = null;
+                        String installedParentPath = 
sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
+                        String jobManager = 
sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+                        if (jobManager == null) {
+                            logger.error("No Job Manager is configured, so we 
are picking pbs as the default job manager");
+                            jConfig = 
CommonUtils.getPBSJobManager(installedParentPath);
+                        } else {
+                            if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = 
CommonUtils.getPBSJobManager(installedParentPath);
+                            } else if 
(SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = 
CommonUtils.getSLURMJobManager(installedParentPath);
+                            } else if 
(SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = 
CommonUtils.getUGEJobManager(installedParentPath);
+                            }else 
if(LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = 
CommonUtils.getLSFJobManager(installedParentPath);
+                            }
+                        }
+                        pbsCluster = new PBSCluster(serverInfo, 
tokenizedMyProxyAuthInfo, jConfig);
+                        context = new 
GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, 
pbsCluster);
+                        List<Cluster> pbsClusters = null;
+                        if (!(clusters.containsKey(key))) {
+                            pbsClusters = new ArrayList<Cluster>();
+                        } else {
+                            pbsClusters = clusters.get(key);
+                        }
+                        pbsClusters.add(pbsCluster);
+                        clusters.put(key, pbsClusters);
+                    }
+                }
+
+                
jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), 
context);
+            }
+        } catch (Exception e) {
+            throw new GFacException("An error occurred while creating GSI 
security context", e);
+        }
+    }
+
+    public static JobDescriptor createJobDescriptor(JobExecutionContext 
jobExecutionContext, Cluster cluster) {
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        TaskDetails taskData = jobExecutionContext.getTaskData();
+        ResourceJobManager resourceJobManager = 
jobExecutionContext.getResourceJobManager();
+        try {
+                       
if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
+                               
jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
+                               String emailids = 
ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+
+                               
if(jobExecutionContext.getTaskData().isSetEmailAddresses()){
+                                       List<String> emailList = 
jobExecutionContext.getTaskData().getEmailAddresses();
+                                       String elist = 
GFacUtils.listToCsv(emailList, ',');
+                                       if(emailids != null && 
!emailids.isEmpty()){
+                                               emailids = emailids +"," + 
elist;
+                                       }else{
+                                               emailids = elist;
+                                       }
+                               }
+                               if(emailids != null && !emailids.isEmpty()){
+                                       logger.info("Email list: "+ emailids);
+                                       jobDescriptor.setMailAddress(emailids);
+                               }
+                       }
+               } catch (ApplicationSettingsException e) {
+                        logger.error("ApplicationSettingsException : " 
+e.getLocalizedMessage());
+               }
+        // this is common for any application descriptor
+        jobDescriptor.setCallBackIp(ServerSettings.getIp());
+        
jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT,
 "8950"));
+        jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+        jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+        
jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath());
+        
jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+        
jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
+        String computationalProjectAccount = 
taskData.getTaskScheduling().getComputationalProjectAccount();
+        taskData.getEmailAddresses();
+        if (computationalProjectAccount == null){
+            ComputeResourcePreference computeResourcePreference = 
jobExecutionContext.getApplicationContext().getComputeResourcePreference();
+            if (computeResourcePreference != null) {
+                computationalProjectAccount = 
computeResourcePreference.getAllocationProjectNumber();
+            }
+        }
+        if (computationalProjectAccount != null) {
+            jobDescriptor.setAcountString(computationalProjectAccount);
+        }
+
+        Random random = new Random();
+        int i = random.nextInt(Integer.MAX_VALUE); // We always set the job 
name
+        jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
+        jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
+
+        List<String> inputValues = new ArrayList<String>();
+        MessageContext input = jobExecutionContext.getInMessageContext();
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new 
Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, 
InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - 
t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new 
TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (Object object : input.getParameters().values()) {
+            if (object instanceof InputDataObjectType) {
+                InputDataObjectType inputDOT = (InputDataObjectType) object;
+                sortedInputSet.add(inputDOT);
+            }
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+                continue;
+            }
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && 
!inputDataObjectType.getApplicationArgument().equals("")) {
+                inputValues.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                if (inputDataObjectType.getType() == DataType.URI) {
+                    // set only the relative path
+                    String filePath = inputDataObjectType.getValue();
+                    filePath = 
filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, 
filePath.length());
+                    inputValues.add(filePath);
+                }else {
+                    inputValues.add(inputDataObjectType.getValue());
+                }
+
+            }
+        }
+
+        Map<String, Object> outputParams = 
jobExecutionContext.getOutMessageContext().getParameters();
+        for (Object outputParam : outputParams.values()) {
+            if (outputParam instanceof OutputDataObjectType) {
+                OutputDataObjectType output = (OutputDataObjectType) 
outputParam;
+                if (output.getApplicationArgument() != null
+                        && !output.getApplicationArgument().equals("")) {
+                    inputValues.add(output.getApplicationArgument());
+                }
+                if (output.getValue() != null && !output.getValue().equals("") 
&& output.isRequiredToAddedToCommandLine()) {
+                    if (output.getType() == DataType.URI){
+                        String filePath = output.getValue();
+                        filePath = 
filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, 
filePath.length());
+                        inputValues.add(filePath);
+                    }
+                }
+            }
+        }
+        jobDescriptor.setInputValues(inputValues);
+
+        jobDescriptor.setUserName(((GSISSHAbstractCluster) 
cluster).getServerInfo().getUserName());
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setOwner(((PBSCluster) 
cluster).getServerInfo().getUserName());
+
+        ComputationalResourceScheduling taskScheduling = 
taskData.getTaskScheduling();
+        if (taskScheduling != null) {
+            int totalNodeCount = taskScheduling.getNodeCount();
+            int totalCPUCount = taskScheduling.getTotalCPUCount();
+
+//        
jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+            if (taskScheduling.getComputationalProjectAccount() != null) {
+                
jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+            }
+            if (taskScheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(taskScheduling.getQueueName());
+            }
+
+            if (totalNodeCount > 0) {
+                jobDescriptor.setNodes(totalNodeCount);
+            }
+            if (taskScheduling.getComputationalProjectAccount() != null) {
+                
jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+            }
+            if (taskScheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(taskScheduling.getQueueName());
+            }
+            if (totalCPUCount > 0) {
+                int ppn = totalCPUCount / totalNodeCount;
+                jobDescriptor.setProcessesPerNode(ppn);
+                jobDescriptor.setCPUCount(totalCPUCount);
+            }
+            if (taskScheduling.getWallTimeLimit() > 0) {
+                
jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
+                
if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+                    
jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
+                }
+            }
+
+            if (taskScheduling.getTotalPhysicalMemory() > 0) {
+                
jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
+            }
+        } else {
+            logger.error("Task scheduling cannot be null at this point..");
+        }
+
+        ApplicationDeploymentDescription appDepDescription = 
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+        List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+        if (moduleCmds != null) {
+            for (String moduleCmd : moduleCmds) {
+                jobDescriptor.addModuleLoadCommands(moduleCmd);
+            }
+        }
+        List<String> preJobCommands = appDepDescription.getPreJobCommands();
+        if (preJobCommands != null) {
+            for (String preJobCommand : preJobCommands) {
+                jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, 
jobExecutionContext));
+            }
+        }
+
+        List<String> postJobCommands = appDepDescription.getPostJobCommands();
+        if (postJobCommands != null) {
+            for (String postJobCommand : postJobCommands) {
+                jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, 
jobExecutionContext));
+            }
+        }
+
+        ApplicationParallelismType parallelism = 
appDepDescription.getParallelism();
+        if (parallelism != null){
+            if (parallelism == ApplicationParallelismType.MPI || parallelism 
== ApplicationParallelismType.OPENMP || parallelism == 
ApplicationParallelismType.OPENMP_MPI){
+                Map<JobManagerCommand, String> jobManagerCommands = 
resourceJobManager.getJobManagerCommands();
+                if (jobManagerCommands != null && 
!jobManagerCommands.isEmpty()) {
+                    for (JobManagerCommand command : 
jobManagerCommands.keySet()) {
+                        if (command == JobManagerCommand.SUBMISSION) {
+                            String commandVal = 
jobManagerCommands.get(command);
+                            jobDescriptor.setJobSubmitter(commandVal);
+                        }
+                    }
+                }
+            }
+        }
+        return jobDescriptor;
+    }
+
+    private static String parseCommand(String value, JobExecutionContext 
jobExecutionContext) {
+        String parsedValue = value.replaceAll("\\$workingDir", 
jobExecutionContext.getWorkingDir());
+        parsedValue = parsedValue.replaceAll("\\$inputDir", 
jobExecutionContext.getInputDir());
+        parsedValue = parsedValue.replaceAll("\\$outputDir", 
jobExecutionContext.getOutputDir());
+        return parsedValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..e7c6572
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = 
LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+
+    @Subscribe
+    public void updateRegistry(JobStatusChangeRequestEvent jobStatus) throws 
Exception{
+        /* Here we need to parse the jobStatus message and update
+                the registry accordingly, for now we are just printing to 
standard Out
+                 */
+        JobState state = jobStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = jobStatus.getJobIdentity().getTaskId();
+                String jobID = jobStatus.getJobIdentity().getJobId();
+                String expId = jobStatus.getJobIdentity().getExperimentId();
+                updateJobStatus(expId,taskID, jobID, state);
+                       logger.debug("expId - {}: Publishing job status for " + 
jobStatus.getJobIdentity().getJobId() + ":"
+                        + 
state.toString(),jobStatus.getJobIdentity().getExperimentId());
+                JobStatusChangeEvent event = new 
JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
+                monitorPublisher.publish(event);
+                String messageId = AiravataUtils.getId("JOB");
+                MessageContext msgCntxt = new MessageContext(event, 
MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId());
+                msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                publisher.publish(msgCntxt);
+            } catch (Exception e) {
+                logger.error("expId - " + 
jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data"
+                        + e.getLocalizedMessage(), e);
+                throw new Exception("Error persisting job status..", e);
+            }
+        }
+    }
+
+    public  void updateJobStatus(String expId, String taskId, String jobID, 
JobState state) throws Exception {
+        logger.info("expId - {}: Updating job status for " + jobID + ":" + 
state.toString(), expId);
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = (JobDetails) 
airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids);
+        if (details == null) {
+            details = new JobDetails();
+        }
+        org.apache.airavata.model.workspace.experiment.JobStatus status = new 
org.apache.airavata.model.workspace.experiment.JobStatus();
+        if (JobState.CANCELED.equals(details.getJobStatus().getJobState()) ||
+                
JobState.CANCELING.equals(details.getJobStatus().getJobState())) {
+            status.setJobState(details.getJobStatus().getJobState());
+        } else {
+            status.setJobState(state);
+        }
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setJobStatus(status);
+        details.setJobID(jobID);
+        logger.debug("expId - {}: Updated job status for " + jobID + ":" + 
details.getJobStatus().toString(), expId);
+        airavataRegistry.update(RegistryModelType.JOB_STATUS, status, ids);
+    }
+
+       @SuppressWarnings("unchecked")
+       public void setup(Object... configurations) {
+               for (Object configuration : configurations) {
+                       if (configuration instanceof Registry){
+                               this.airavataRegistry=(Registry)configuration;
+                       } else if (configuration instanceof MonitorPublisher){
+                               this.monitorPublisher=(MonitorPublisher) 
configuration;
+                       } else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..94029be
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = 
LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+    private Registry airavataRegistry;
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+    
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void setupTaskStatus(TaskStatusChangeRequestEvent taskStatus) 
throws Exception{
+       try {
+                       
updateTaskStatus(taskStatus.getTaskIdentity().getTaskId(), 
taskStatus.getState());
+            logger.debug("expId - {}: Publishing task status for " + 
taskStatus.getTaskIdentity().getTaskId() + ":"
+                    + taskStatus.getState().toString(), 
taskStatus.getTaskIdentity().getExperimentId());
+            TaskStatusChangeEvent event = new 
TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity());
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("TASK");
+            MessageContext msgCntxt = new MessageContext(event, 
MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            publisher.publish(msgCntxt);
+               } catch (Exception e) {
+            String msg = "Error persisting data task status to database...";
+            logger.error(msg + e.getLocalizedMessage(), e);
+            throw new Exception(msg, e);
+               }
+    }
+
+    @Subscribe
+    public void setupTaskStatus(JobStatusChangeEvent jobStatus) throws 
Exception{
+       TaskState state=TaskState.UNKNOWN;
+       switch(jobStatus.getState()){
+       case ACTIVE:
+               state=TaskState.EXECUTING; break;
+       case CANCELED:
+               state=TaskState.CANCELED; break;
+       case COMPLETE: case FAILED:
+               state=TaskState.POST_PROCESSING; break;
+       case HELD: case SUSPENDED: case QUEUED:
+               state=TaskState.WAITING; break;
+       case SETUP:
+               state=TaskState.PRE_PROCESSING; break;
+       case SUBMITTED:
+               state=TaskState.STARTED; break;
+       case UN_SUBMITTED:
+               state=TaskState.CANCELED; break;
+       case CANCELING:
+               state=TaskState.CANCELING; break;
+               default:
+                       return;
+       }
+       try {
+                       
updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state);
+            logger.debug("expId - {}: Publishing task status for " + 
jobStatus.getJobIdentity().getTaskId() + ":"
+                    + state.toString(), 
jobStatus.getJobIdentity().getExperimentId());
+            TaskIdentifier taskIdentity = new 
TaskIdentifier(jobStatus.getJobIdentity().getTaskId(),
+                                                         
jobStatus.getJobIdentity().getWorkflowNodeId(),
+                                                         
jobStatus.getJobIdentity().getExperimentId(),
+                                                         
jobStatus.getJobIdentity().getGatewayId());
+            TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, 
taskIdentity);
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("TASK");
+            MessageContext msgCntxt = new MessageContext(event, 
MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            publisher.publish(msgCntxt);
+
+        }  catch (Exception e) {
+            logger.error("expId - " + 
jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data" + 
e.getLocalizedMessage(), e);
+            throw new Exception("Error persisting task status..", e);
+               }
+    }
+    
+    public  TaskState updateTaskStatus(String taskId, TaskState state) throws 
Exception {
+       TaskDetails details = 
(TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId);
+        if(details == null) {
+            logger.error("Task details cannot be null at this point");
+            throw new Exception("Task details cannot be null at this point");
+        }
+        org.apache.airavata.model.workspace.experiment.TaskStatus status = new 
org.apache.airavata.model.workspace.experiment.TaskStatus();
+        
if(!TaskState.CANCELED.equals(details.getTaskStatus().getExecutionState())
+                && 
!TaskState.CANCELING.equals(details.getTaskStatus().getExecutionState())){
+            status.setExecutionState(state);
+        }else{
+            
status.setExecutionState(details.getTaskStatus().getExecutionState());
+        }
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setTaskStatus(status);
+        logger.debug("Updating task status for 
"+taskId+":"+details.getTaskStatus().toString());
+
+        airavataRegistry.update(RegistryModelType.TASK_STATUS, status, taskId);
+        return status.getExecutionState();
+    }
+
+       public void setup(Object... configurations) {
+               for (Object configuration : configurations) {
+                       if (configuration instanceof Registry){
+                               this.airavataRegistry=(Registry)configuration;
+                       } else if (configuration instanceof MonitorPublisher){
+                               this.monitorPublisher=(MonitorPublisher) 
configuration;
+                       } else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
+       }
+
+
+    @Subscribe
+    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) 
throws AiravataException {
+        String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+        logger.debug("Task Output changed event received for workflow node : " 
+
+                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", 
task : " + taskId);
+        // TODO - do we need to update the output to the registry? , we do it 
in the workflowInterpreter too.
+        MessageContext messageContext = new MessageContext(taskOutputEvent, 
MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), 
taskOutputEvent.getTaskIdentity().getGatewayId());
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        publisher.publish(messageContext);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..092774b
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.WorkflowIdentifier;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements 
AbstractActivityListener {
+    private final static Logger logger = 
LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private Registry airavataRegistry;
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+
+
+
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void setupWorkflowNodeStatus(TaskStatusChangeEvent taskStatus) 
throws Exception{
+       WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+       switch(taskStatus.getState()){
+       case CANCELED:
+               state=WorkflowNodeState.CANCELED; break;
+       case COMPLETED:
+               state=WorkflowNodeState.COMPLETED; break;
+       case CONFIGURING_WORKSPACE:
+               state=WorkflowNodeState.INVOKED; break;
+       case FAILED:
+               state=WorkflowNodeState.FAILED; break;
+       case EXECUTING: case WAITING: case PRE_PROCESSING: case 
POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+               state=WorkflowNodeState.EXECUTING; break;
+       case STARTED:
+               state=WorkflowNodeState.INVOKED; break;
+       case CANCELING:
+               state=WorkflowNodeState.CANCELING; break;
+               default:
+                       return;
+       }
+       try {
+            String expId = taskStatus.getTaskIdentity().getExperimentId();
+                       updateWorkflowNodeStatus(expId, 
taskStatus.getTaskIdentity().getWorkflowNodeId(), state);
+            logger.debug("expId - {}: Publishing workflow node status for " + 
taskStatus.getTaskIdentity().getWorkflowNodeId()
+                    + ":" + state.toString(), 
taskStatus.getTaskIdentity().getExperimentId());
+            WorkflowIdentifier workflowIdentity = new 
WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(),
+                                                                         
taskStatus.getTaskIdentity().getExperimentId(),
+                                                                         
taskStatus.getTaskIdentity().getGatewayId());
+            WorkflowNodeStatusChangeEvent event = new 
WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("WFNODE");
+            MessageContext msgCntxt = new MessageContext(event, 
MessageType.WORKFLOWNODE, messageId, 
taskStatus.getTaskIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+            publisher.publish(msgCntxt);
+               } catch (Exception e) {
+            logger.error("expId - " + 
taskStatus.getTaskIdentity().getExperimentId() + ": Error persisting data"
+                    + e.getLocalizedMessage(), e);
+            throw new Exception("Error persisting workflow node status..", e);
+        }
+    }
+
+    public  void updateWorkflowNodeStatus(String experimentId, String 
workflowNodeId, WorkflowNodeState state) throws Exception {
+               logger.info("expId - {}: Updating workflow node status for 
"+workflowNodeId+":"+state.toString(), experimentId);
+       WorkflowNodeDetails details = 
(WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL,
 workflowNodeId);
+        if(details == null) {
+            details = new WorkflowNodeDetails();
+            details.setNodeInstanceId(workflowNodeId);
+        }
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setWorkflowNodeStatus(status);
+        airavataRegistry.update(RegistryModelType.WORKFLOW_NODE_STATUS, 
status, workflowNodeId);
+    }
+
+       public void setup(Object... configurations) {
+               for (Object configuration : configurations) {
+                       if (configuration instanceof Registry){
+                               this.airavataRegistry=(Registry)configuration;
+                       } else if (configuration instanceof MonitorPublisher){
+                               this.monitorPublisher=(MonitorPublisher) 
configuration;
+                       }  else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
+       }
+}

Reply via email to