Repository: airavata
Updated Branches:
  refs/heads/master 4568832b3 -> e290cfe17


Fixed master branch to job submission with local file.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e290cfe1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e290cfe1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e290cfe1

Branch: refs/heads/master
Commit: e290cfe17a5c4f6f0422f1830067637cb81b0494
Parents: 4568832
Author: Shameera Rathanyaka <[email protected]>
Authored: Wed Jul 15 18:01:23 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Wed Jul 15 18:01:23 2015 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  10 +-
 .../server/src/main/resources/gfac-config.yaml  |  15 +-
 .../gfac/core/config/GFacYamlConfigruation.java |   2 +-
 .../gfac/core/context/ProcessContext.java       |  29 ++++
 .../airavata/gfac/core/context/TaskContext.java |   4 +
 .../org/apache/airavata/gfac/impl/Factory.java  | 145 +++++++++---------
 .../airavata/gfac/impl/GFacEngineImpl.java      |  63 +++++++-
 .../apache/airavata/gfac/impl/GFacWorker.java   | 150 +++++++++++--------
 .../gfac/impl/task/SCPDataStageTask.java        |  87 +++++++++++
 .../gfac/impl/task/SCPInputDataStageTask.java   |  24 +--
 .../gfac/monitor/email/EmailBasedMonitor.java   |  84 +++++------
 .../airavata/gfac/server/GfacServerHandler.java |   1 +
 12 files changed, 414 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git 
a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 37521f6..b898d96 100644
--- 
a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ 
b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -34,7 +34,8 @@ public class ServerSettings extends ApplicationSettings {
 
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_USER_PASSWORD = 
"default.registry.password";
-    private static final String DEFAULT_USER_GATEWAY = 
"default.registry.gateway";
+       private static final String DEFAULT_USER_GATEWAY = 
"default.registry.gateway";
+       private static final String OUTPUT_LOCATION = "out.location";
 
     private static final String SERVER_CONTEXT_ROOT = "server.context-root";
     public static final String IP = "ip";
@@ -98,8 +99,9 @@ public class ServerSettings extends ApplicationSettings {
 
     private static boolean stopAllThreads = false;
     private static boolean emailBaseNotificationEnable;
+       private static String outputLocation;
 
-    public static String getDefaultUser() throws ApplicationSettingsException {
+       public static String getDefaultUser() throws 
ApplicationSettingsException {
         return getSetting(DEFAULT_USER);
     }
 
@@ -346,4 +348,8 @@ public class ServerSettings extends ApplicationSettings {
     public static String getSecurityManagerClassName() throws 
ApplicationSettingsException {
         return getSetting(Constants.SECURITY_MANAGER_CLASS);
     }
+
+       public static String getOutputLocation() {
+               return getSetting(OUTPUT_LOCATION, 
System.getProperty("java.io.tmpdir"));
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml 
b/modules/configuration/server/src/main/resources/gfac-config.yaml
index 46ece9c..31fd63a 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -41,12 +41,15 @@ commonTasks:
 
 fileTransferTasks:
   - transferProtocol: SCP
-    taskClass: org.apache.airavata.gfac.impl.task.SCPInputDataStageTask
-    properties:
-     - password: pwd123
-       passPhrase: test
-       privateKey: key
-       publicKey: pubkey
+    taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
+
+#  - transferProtocol: SCP
+#    taskClass: org.apache.airavata.gfac.impl.task.SCPInputDataStageTask
+#    properties:
+#     - password: pwd123
+#       passPhrase: test
+#       privateKey: key
+#       publicKey: pubkey
 
   #- transferProtocol: SFTP
   #  taskClass: org.apache.airavata.task.adapters.SFTPFileTransferTask

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java
index fa8ce32..5101b41 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java
@@ -45,7 +45,7 @@ public class GFacYamlConfigruation {
        private static final String JOB_MANAGER_TYPE = "jobManagerType";
        private static final String COMMAND_OUTPUT_PARSER = 
"commandOutputParser";
        private static final String EMAIL_PARSER = "emailParser";
-       private static final String RESOURCE_EMAIL_ADDRESS = 
"resourceEmailAddress";
+       private static final String RESOURCE_EMAIL_ADDRESS = 
"resourceEmailAddresses";
        private static final String PROPERTIES = "properties";
 
        private List<JobSubmitterTaskConfig> jobSubmitters = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index b7b02c1..dc8dace 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -29,6 +29,8 @@ import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDes
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import 
org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.job.JobModel;
@@ -55,6 +57,7 @@ public class ProcessContext {
        private String workingDir;
        private String inputDir;
        private String outputDir;
+       private String localWorkingDir;
        private List<TaskContext> taskChain;
        private GatewayResourceProfile gatewayResourceProfile;
        private ComputeResourceDescription computeResourceDescription;
@@ -68,6 +71,8 @@ public class ProcessContext {
        private DataMovementProtocol dataMovementProtocol;
        private JobModel jobModel;
        private ComputeResourcePreference computeResourcePreference;
+       private MonitorMode monitorMode;
+       private ResourceJobManager resourceJobManager;
 
        /**
         * Note: process context property use lazy loading approach. In runtime 
you will see some properties as null
@@ -292,4 +297,28 @@ public class ProcessContext {
        public String getComputeResourceId() {
                return getComputeResourceDescription().getComputeResourceId();
        }
+
+       public void setMonitorMode(MonitorMode monitorMode) {
+               this.monitorMode = monitorMode;
+       }
+
+       public MonitorMode getMonitorMode() {
+               return monitorMode;
+       }
+
+       public void setResourceJobManager(ResourceJobManager 
resourceJobManager) {
+               this.resourceJobManager = resourceJobManager;
+       }
+
+       public ResourceJobManager getResourceJobManager() {
+               return resourceJobManager;
+       }
+
+       public String getLocalWorkingDir() {
+               return localWorkingDir;
+       }
+
+       public void setLocalWorkingDir(String localWorkingDir) {
+               this.localWorkingDir = localWorkingDir;
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 1be5142..95d2fb9 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -72,4 +72,8 @@ public class TaskContext {
        public String getTaskId() {
                return taskModel.getTaskId();
        }
+
+       public String getLocalWorkingDir() {
+               return getParentProcessContext().getLocalWorkingDir();
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index b8a3d4e..51db6f3 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -30,6 +30,7 @@ import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.cluster.OutputParser;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
@@ -56,6 +57,7 @@ import 
org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import 
org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
@@ -68,8 +70,11 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
@@ -78,16 +83,24 @@ import java.util.Map;
 
 public abstract class Factory {
 
+       private static final Logger log = 
LoggerFactory.getLogger(Factory.class);
+/*     static{
+               try {
+                       loadConfiguration();
+               } catch (GFacException e) {
+                       log.error("Error while loading configurations");
+               }
+       }*/
+
        private static GFacEngine engine;
        private static Publisher statusPublisher;
        private static CuratorFramework curatorClient;
        private static EmailBasedMonitor emailBasedMonitor;
-       private static Date startMonitorDate = Calendar.getInstance().getTime();
        private static Map<String, RemoteCluster> remoteClusterMap = new 
HashMap<>();
        private static Map<JobSubmissionProtocol, JobSubmissionTask> 
jobSubmissionTask = new HashMap<>();
        private static Map<DataMovementProtocol, Task> dataMovementTask = new 
HashMap<>();
        private static Map<ResourceJobManagerType, ResourceConfig> resources = 
new HashMap<>();
-       private static boolean readConfig = false;
+       private static Map<MonitorMode, JobMonitor> jobMonitorServices = new 
HashMap<>();
 
        public static GFacEngine getGFacEngine() throws GFacException {
                if (engine == null) {
@@ -132,41 +145,36 @@ public abstract class Factory {
                return curatorClient;
        }
 
-       public static JobMonitor getJobMonitor(ResourceJobManagerType 
resourceJobManagerType) throws AiravataException {
-               if (resourceJobManagerType == ResourceJobManagerType.FORK) {
-                       return null; // TODO write a job monitor for this.
-               } else {
-                       if (emailBasedMonitor == null) {
-                               synchronized (EmailBasedMonitor.class) {
-                                       if (emailBasedMonitor == null) {
-                                               emailBasedMonitor = new 
EmailBasedMonitor(resourceJobManagerType);
-                                               
emailBasedMonitor.setDate(startMonitorDate);
-                                               new 
Thread(emailBasedMonitor).start();
-                                       }
-                               }
-                       }
-                       return emailBasedMonitor;
+       public static JobManagerConfiguration 
getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws 
GFacException {
+               ResourceConfig resourceConfig = 
Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
+               OutputParser outputParser;
+               try {
+                       Class<? extends OutputParser> aClass = 
Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
+                                       (OutputParser.class);
+                       outputParser = aClass.getConstructor().newInstance();
+               } catch (Exception e) {
+                       throw new GFacException("Error while instantiating 
output parser for " + resourceJobManager
+                                       .getResourceJobManagerType().name());
                }
-       }
 
-       public static JobManagerConfiguration 
getJobManagerConfiguration(ResourceJobManager resourceJobManager) {
                switch (resourceJobManager.getResourceJobManagerType()) {
+
                        case PBS:
                                return new 
PBSJobConfiguration("PBSTemplate.xslt", ".pbs", 
resourceJobManager.getJobManagerBinPath(),
-                                               
resourceJobManager.getJobManagerCommands(), new PBSOutputParser());
+                                               
resourceJobManager.getJobManagerCommands(), outputParser);
                        case SLURM:
                                return new 
SlurmJobConfiguration("SLURMTemplate.xslt", ".slurm", resourceJobManager
-                                               .getJobManagerBinPath(), 
resourceJobManager.getJobManagerCommands(), new SlurmOutputParser());
+                                               .getJobManagerBinPath(), 
resourceJobManager.getJobManagerCommands(), outputParser);
                        case LSF:
                                return new 
LSFJobConfiguration("LSFTemplate.xslt", ".lsf", 
resourceJobManager.getJobManagerBinPath(),
-                                               
resourceJobManager.getJobManagerCommands(), new LSFOutputParser());
+                                               
resourceJobManager.getJobManagerCommands(), outputParser);
                        case UGE:
                                return new 
UGEJobConfiguration("UGETemplate.xslt", ".pbs", 
resourceJobManager.getJobManagerBinPath(),
-                                               
resourceJobManager.getJobManagerCommands(), new UGEOutputParser());
-
+                                               
resourceJobManager.getJobManagerCommands(), outputParser);
                        default:
                                return null;
                }
+
        }
 
        public static HostScheduler getHostScheduler() {
@@ -174,45 +182,27 @@ public abstract class Factory {
        }
 
 
-       public static RemoteCluster getRemoteCluster(ProcessContext processCtx) 
throws GFacException,
+       /**
+        * Factory class manage reomete cluster map, this will solve too many 
connections/ sessions issues with cluster
+        * communications.
+        * @param jobSubmissionProtocol
+        * @param computeResourceId
+        * @param resourceJobManager
+        * @return
+        * @throws GFacException
+        * @throws AppCatalogException
+        * @throws AiravataException
+        */
+       public static RemoteCluster getRemoteCluster(JobSubmissionProtocol 
jobSubmissionProtocol, String computeResourceId,
+                                                    ResourceJobManager 
resourceJobManager) throws GFacException,
                        AppCatalogException, AiravataException {
 
-               String key = processCtx.getJobSubmissionProtocol().toString() + 
":" + processCtx.getComputeResourceId();
+               String key = jobSubmissionProtocol.toString() + ":" + 
computeResourceId;
                RemoteCluster remoteCluster = remoteClusterMap.get(key);
                if (remoteCluster == null) {
-                       String hostName = 
Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(processCtx
-                                       .getComputeResourceId()).getHostName();
+                       String hostName = 
Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(computeResourceId).getHostName();
                        // fixme - read login user name from 
computeResourcePreference
                        ServerInfo serverInfo = new 
ServerInfo(ServerSettings.getSetting("ssh.username"), hostName);
-                       List<JobSubmissionInterface> jobSubmissionInterfaces = 
Factory.getDefaultAppCatalog().getComputeResource()
-                                       
.getComputeResource(processCtx.getComputeResourceId()) 
.getJobSubmissionInterfaces();
-
-                       ResourceJobManager resourceJobManager = null;
-                       JobSubmissionInterface jsInterface = null;
-                       for (JobSubmissionInterface jobSubmissionInterface : 
jobSubmissionInterfaces) {
-                               if 
(jobSubmissionInterface.getJobSubmissionProtocol() == 
processCtx.getJobSubmissionProtocol()) {
-                                       jsInterface = jobSubmissionInterface;
-                                       break;
-                               }
-                       }
-                       if (jsInterface == null) {
-                               // TODO: throw an exception.
-                       } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH) {
-                               SSHJobSubmission sshJobSubmission = 
getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
-                                               
(jsInterface.getJobSubmissionInterfaceId());
-                               resourceJobManager = 
sshJobSubmission.getResourceJobManager();
-                       } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.LOCAL) {
-                               LOCALSubmission localSubmission = 
getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
-                                               
(jsInterface.getJobSubmissionInterfaceId());
-                               resourceJobManager = 
localSubmission.getResourceJobManager();
-                       } else {
-                               // TODO : throw an not supported jobsubmission 
protocol exception. we only support SSH and LOCAL
-                       }
-
-                       if (resourceJobManager == null) {
-                               // TODO throw an exception
-                       }
-
                        JobManagerConfiguration jobManagerConfiguration = 
getJobManagerConfiguration(resourceJobManager);
                        AuthenticationInfo authenticationInfo = 
getSSHKeyAuthentication();
                        remoteCluster = new HPCRemoteCluster(serverInfo, 
jobManagerConfiguration, authenticationInfo);
@@ -236,30 +226,23 @@ public abstract class Factory {
                return sshKA;
        }
 
-       public static JobSubmissionTask 
getJobSubmissionTask(JobSubmissionProtocol jobSubmissionProtocol) throws
-                       GFacException {
-               if (!readConfig) {
-                       loadConfiguration();
-               }
+       public static JobSubmissionTask 
getJobSubmissionTask(JobSubmissionProtocol jobSubmissionProtocol) {
                return jobSubmissionTask.get(jobSubmissionProtocol);
        }
 
-       public static Task getDataMovementTask(DataMovementProtocol 
dataMovementProtocol) throws GFacException {
-               if (!readConfig) {
-                       loadConfiguration();
-               }
+       public static Task getDataMovementTask(DataMovementProtocol 
dataMovementProtocol){
                return dataMovementTask.get(dataMovementProtocol);
        }
 
-       public static ResourceConfig getResourceConfig(ResourceJobManagerType 
resourceJobManagerType) throws
-                       GFacException {
-               if (!readConfig) {
-                       loadConfiguration();
-               }
+       public static ResourceConfig getResourceConfig(ResourceJobManagerType 
resourceJobManagerType) {
                return resources.get(resourceJobManagerType);
        }
 
-       private static void loadConfiguration() throws GFacException {
+       public static Map<ResourceJobManagerType, ResourceConfig> 
getResourceConfig() {
+               return resources;
+       }
+
+       public static void loadConfiguration() throws GFacException {
                GFacYamlConfigruation config = new GFacYamlConfigruation();
                try {
                        for (JobSubmitterTaskConfig jobSubmitterTaskConfig : 
config.getJobSbumitters()) {
@@ -283,10 +266,28 @@ public abstract class Factory {
                        for (ResourceConfig resourceConfig : 
config.getResourceConfiguration()) {
                                
resources.put(resourceConfig.getJobManagerType(), resourceConfig);
                        }
-                       readConfig = true;
                } catch (Exception e) {
                        throw new GFacException("Gfac config issue", e);
                }
        }
 
+       public static JobMonitor getMonitorService(MonitorMode monitorMode) 
throws AiravataException {
+               JobMonitor jobMonitor = jobMonitorServices.get(monitorMode);
+               if (jobMonitor == null) {
+                       synchronized (JobMonitor.class) {
+                               jobMonitor = 
jobMonitorServices.get(monitorMode);
+                               if (jobMonitor == null) {
+                                       switch (monitorMode) {
+                                               case 
JOB_EMAIL_NOTIFICATION_MONITOR:
+                                                       EmailBasedMonitor 
emailBasedMonitor = new EmailBasedMonitor(Factory.getResourceConfig());
+                                                       
jobMonitorServices.put(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR, 
emailBasedMonitor);
+                                                       jobMonitor = 
((JobMonitor) emailBasedMonitor);
+                                                       new 
Thread(emailBasedMonitor).start();
+                                       }
+                               }
+                       }
+               }
+               return jobMonitor;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 3bea455..c7eba99 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.impl;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
@@ -33,6 +34,11 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
@@ -55,6 +61,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -80,6 +87,7 @@ public class GFacEngineImpl implements GFacEngine {
                        processContext.setExperimentCatalog(expCatalog);
                        
processContext.setCuratorClient(Factory.getCuratorClient());
                        
processContext.setStatusPublisher(Factory.getStatusPublisher());
+
                        ProcessModel processModel = (ProcessModel) 
expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
                        processContext.setProcessModel(processModel);
                        GatewayResourceProfile gatewayProfile = 
appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
@@ -92,9 +100,15 @@ public class GFacEngineImpl implements GFacEngine {
                                        
.getApplicationDeployement(processModel.getApplicationDeploymentId()));
                        
processContext.setApplicationInterfaceDescription(appCatalog.getApplicationInterface()
                                        
.getApplicationInterface(processModel.getApplicationInterfaceId()));
-                       
processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
+                       
processContext.setResourceJobManager(getResourceJobManager(processContext));
+                       
processContext.setRemoteCluster(Factory.getRemoteCluster(processContext.getJobSubmissionProtocol(),
+                                       processContext.getComputeResourceId(), 
processContext.getResourceJobManager()));
 
-                       //
+                       String inputPath = ServerSettings.getOutputLocation();
+                       if (inputPath != null) {
+                               
processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : 
inputPath + "/") +
+                                               processContext.getProcessId());
+                       }
                        return processContext;
                } catch (AppCatalogException e) {
                        throw new GFacException("App catalog access exception 
", e);
@@ -134,6 +148,7 @@ public class GFacEngineImpl implements GFacEngine {
                                                } catch (TException e) {
                                                        throw new 
GFacException("Error while serializing data staging sub task model");
                                                }
+                                               saveTaskModel(taskCtx);
                                                
GFacUtils.saveAndPublishTaskStatus(taskCtx);
                                                Task dMoveTask = 
Factory.getDataMovementTask(processContext.getDataMovementProtocol());
                                                executeTask(taskCtx, dMoveTask);
@@ -195,7 +210,7 @@ public class GFacEngineImpl implements GFacEngine {
                // create data staging sub task model
                DataStagingTaskModel submodel = new DataStagingTaskModel();
                submodel.setSource(processInput.getValue());
-               submodel.setDestination(processContext.getWorkingDir());
+               
submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + 
processContext.getWorkingDir());
                
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
                taskCtx.setTaskModel(taskModel);
                return taskCtx;
@@ -212,9 +227,13 @@ public class GFacEngineImpl implements GFacEngine {
                taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
                taskModel.setTaskType(TaskTypes.DATA_STAGING);
                // create data staging sub task model
+               String remoteOutputDir = processContext.getOutputDir();
+               remoteOutputDir = remoteOutputDir.endsWith("/") ? 
remoteOutputDir : remoteOutputDir + "/";
                DataStagingTaskModel submodel = new DataStagingTaskModel();
-               submodel.setSource(processOutput.getValue());
-               submodel.setDestination(processContext.getWorkingDir());
+               
submodel.setSource(processContext.getDataMovementProtocol().name() + ":" + 
remoteOutputDir + processOutput
+                               .getValue());
+               String localWorkingDir = processContext.getLocalWorkingDir();
+               submodel.setDestination("file://" + localWorkingDir);
                
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
                taskCtx.setTaskModel(taskModel);
                return taskCtx;
@@ -271,6 +290,9 @@ public class GFacEngineImpl implements GFacEngine {
                                        } catch (TException e) {
                                                throw new GFacException("Thrift 
model to byte[] convertion issue", e);
                                        }
+                                       File localWorkingdir = new 
File(taskCtx.getLocalWorkingDir());
+                                       localWorkingdir.mkdirs(); // make local 
dir if not exist
+                                       saveTaskModel(taskCtx);
                                        
GFacUtils.saveAndPublishTaskStatus(taskCtx);
                                        Task dMoveTask = 
Factory.getDataMovementTask(processContext.getDataMovementProtocol());
                                        executeTask(taskCtx, dMoveTask);
@@ -307,5 +329,36 @@ public class GFacEngineImpl implements GFacEngine {
                });
        }
 
+       public static ResourceJobManager getResourceJobManager(ProcessContext 
processCtx) throws AppCatalogException {
+               List<JobSubmissionInterface> jobSubmissionInterfaces = 
Factory.getDefaultAppCatalog().getComputeResource()
+                               
.getComputeResource(processCtx.getComputeResourceId()) 
.getJobSubmissionInterfaces();
+
+               ResourceJobManager resourceJobManager = null;
+               JobSubmissionInterface jsInterface = null;
+               for (JobSubmissionInterface jobSubmissionInterface : 
jobSubmissionInterfaces) {
+                       if (jobSubmissionInterface.getJobSubmissionProtocol() 
== processCtx.getJobSubmissionProtocol()) {
+                               jsInterface = jobSubmissionInterface;
+                               break;
+                       }
+               }
+               if (jsInterface == null) {
+                       // TODO: throw an exception.
+               } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH) {
+                       SSHJobSubmission sshJobSubmission = 
Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
+                                       
(jsInterface.getJobSubmissionInterfaceId());
+                       
processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move 
this to populate process context method.
+                       resourceJobManager = 
sshJobSubmission.getResourceJobManager();
+               } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.LOCAL) {
+                       LOCALSubmission localSubmission = 
Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
+                                       
(jsInterface.getJobSubmissionInterfaceId());
+                       resourceJobManager = 
localSubmission.getResourceJobManager();
+               } else {
+                       // TODO : throw an not supported jobsubmission protocol 
exception. we only support SSH and LOCAL
+               }
 
+               if (resourceJobManager == null) {
+                       // TODO throw an exception
+               }
+               return resourceJobManager;
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 51aa8f8..899f684 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,9 +21,14 @@
 
 package org.apache.airavata.gfac.impl;
 
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,22 +44,19 @@ public class GFacWorker implements Runnable {
 
        /**
         * This will be called by monitoring service.
-        * @param processContext
-        * @throws GFacException
         */
-    public   GFacWorker(ProcessContext processContext) throws GFacException {
-        if (processContext == null) {
-            throw new GFacException("Worker must initialize with valide 
processContext, Process context is null");
-        }
-           this.processContext = processContext;
-    }
+       public GFacWorker(ProcessContext processContext) throws GFacException {
+               if (processContext == null) {
+                       throw new GFacException("Worker must initialize with 
valide processContext, Process context is null");
+               }
+               this.processId = processContext.getProcessId();
+               this.gatewayId = processContext.getGatewayId();
+               this.tokenId = processContext.getTokenId();
+               this.processContext = processContext;
+       }
 
        /**
         * This constructor will be called when new or recovery request comes.
-        * @param processId
-        * @param gatewayId
-        * @param tokenId
-        * @throws GFacException
         */
        public GFacWorker(String processId, String gatewayId, String tokenId) 
throws GFacException {
                this.processId = processId;
@@ -62,53 +64,83 @@ public class GFacWorker implements Runnable {
                this.tokenId = tokenId;
        }
 
-    @Override
-    public void run() {
-           try {
-                   GFacEngine engine = Factory.getGFacEngine();
-                   if (processContext == null) {
-                           processContext = 
engine.populateProcessContext(processId, gatewayId, tokenId);
-                           isProcessContextPopulated = true;
-                   }
-                   ProcessType type = getProcessType(processContext);
-                   try {
-                           switch (type) {
-                                   case NEW:
-                                           
engine.executeProcess(processContext);
-                                           break;
-                                   case RECOVER:
-                                           // recover the process
-//                                         
engine.recoverProcess(processContext);
-                                           
engine.executeProcess(processContext);
-                                           break;
-                                   case OUTFLOW:
-                                           // run the outflow task
-                                           
engine.runProcessOutflow(processContext);
-                                           break;
-                                   case RECOVER_OUTFLOW:
-                                           // recover  outflow task;
-                                           
engine.recoverProcessOutflow(processContext);
-                           }
-                   } catch (GFacException e) {
-                           switch (type) {
-                                   case NEW:
-                                           log.error("Process execution 
error", e);
-                                           break;
-                                   case RECOVER:
-                                           log.error("Process recover error ", 
e);
-                                           break;
-                                   case OUTFLOW:
-                                           log.error("Process outflow 
execution error", e);
-                                           break;
-                                   case RECOVER_OUTFLOW:
-                                           log.error("Process outflow recover 
error", e);
-                                           break;
-                           }
-                   }
-           } catch (GFacException e) {
-                   log.error("GFac Worker throws an exception", e);
-           }
-    }
+       @Override
+       public void run() {
+               try {
+                       GFacEngine engine = Factory.getGFacEngine();
+                       if (processContext == null) {
+                               processContext = 
engine.populateProcessContext(processId, gatewayId, tokenId);
+                               isProcessContextPopulated = true;
+                       }
+                       ProcessType type = getProcessType(processContext);
+                       try {
+                               switch (type) {
+                                       case NEW:
+                                               exectuteProcess(engine);
+                                               break;
+                                       case RECOVER:
+                                               recoverProcess(engine);
+                                               break;
+                                       case OUTFLOW:
+                                               // run the outflow task
+                                               
engine.runProcessOutflow(processContext);
+                                               
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+                                               break;
+                                       case RECOVER_OUTFLOW:
+                                               // recover  outflow task;
+                                               
engine.recoverProcessOutflow(processContext);
+                                               
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+                                               break;
+                                       default:
+                                               throw new 
GFacException("process Id : " + processId + " Couldn't identify process type");
+                               }
+                       } catch (GFacException e) {
+                               switch (type) {
+                                       case NEW:
+                                               log.error("Process execution 
error", e);
+                                               break;
+                                       case RECOVER:
+                                               log.error("Process recover 
error ", e);
+                                               break;
+                                       case OUTFLOW:
+                                               log.error("Process outflow 
execution error", e);
+                                               break;
+                                       case RECOVER_OUTFLOW:
+                                               log.error("Process outflow 
recover error", e);
+                                               break;
+                               }
+                               throw e;
+                       }
+               } catch (GFacException e) {
+                       log.error("GFac Worker throws an exception", e);
+               }
+       }
+
+       private void recoverProcess(GFacEngine engine) throws GFacException {
+               // recover the process
+               //      engine.recoverProcess(processContext);
+               exectuteProcess(engine); // TODO - implement recover process.
+       }
+
+       private void exectuteProcess(GFacEngine engine) throws GFacException {
+               engine.executeProcess(processContext);
+               if (processContext.getMonitorMode() == null) {
+                       engine.runProcessOutflow(processContext);
+               } else {
+                       try {
+                               JobMonitor monitorService = 
Factory.getMonitorService(processContext.getMonitorMode());
+                               if (monitorService != null) {
+                                       
monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+                                       processContext.setProcessStatus(new 
ProcessStatus(ProcessState.MONITORING));
+                               } else {
+                                       // we directly invoke outflow
+                                       
engine.runProcessOutflow(processContext);
+                               }
+                       } catch (AiravataException e) {
+                               throw new GFacException("Error while retrieving 
moniot service", e);
+                       }
+               }
+       }
 
        private ProcessType getProcessType(ProcessContext processContext) {
                // check the status and return correct type of process.

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
new file mode 100644
index 0000000..089535e
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class SCPDataStageTask implements Task {
+       @Override
+       public void init(Map<String, String> propertyMap) throws TaskException {
+
+       }
+
+       @Override
+       public TaskState execute(TaskContext taskContext) throws TaskException {
+
+               if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
+                       throw new TaskException("Invalid task call, expected " 
+ TaskTypes.DATA_STAGING.toString() + " but found "
+                                       + 
taskContext.getTaskModel().getTaskType().toString());
+               }
+               try {
+                       DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+                                       .getTaskModel());
+                       URI sourceURI = new URI(subTaskModel.getSource());
+                       URI destinationURI = new 
URI(subTaskModel.getDestination());
+
+                       if (sourceURI.getScheme().equalsIgnoreCase("file")) {  
//  Airavata --> RemoteCluster
+                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
 destinationURI
+                                               .getPath());
+                       } else { // RemoteCluster --> Airavata
+                               
taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
 destinationURI
+                                               .getPath());
+                       }
+               } catch (SSHApiException e) {
+                       throw new TaskException("Scp attempt failed", e);
+               } catch (TException e) {
+                       throw new TaskException("Invalid task invocation");
+               } catch (URISyntaxException e) {
+                       throw new TaskException("source or destination is not a 
valid URI");
+               }
+               return null;
+       }
+
+       @Override
+       public TaskState recover(TaskContext taskContext) throws TaskException {
+               return null;
+       }
+
+       @Override
+       public TaskTypes getType() {
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 76be7a0..332a0aa 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -23,10 +23,12 @@ package org.apache.airavata.gfac.impl.task;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
@@ -35,6 +37,8 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 
 public class SCPInputDataStageTask extends AbstractSCPTask {
@@ -50,31 +54,33 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
                                        + 
taskContext.getTaskModel().getTaskType().toString());
                }
                try {
-                       DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext.getTaskModel());
-                       URL sourceURL = new URL(subTaskModel.getSource());
-                       URL destinationURL = new 
URL(subTaskModel.getDestination());
+                       DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+                                       .getTaskModel());
+                       URI sourceURI = new URI(subTaskModel.getSource());
+                       URI destinationURI = new 
URI(subTaskModel.getDestination());
 
-                       if (sourceURL.getProtocol().equalsIgnoreCase("file")) { 
 //  local --> Airavata --> RemoteCluster
-                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
+                       if (sourceURI.getScheme().equalsIgnoreCase("file")) {  
//  local --> Airavata --> RemoteCluster
+                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
                                                subTaskModel.getDestination());
                        } else { // PGA(client) --> Airavata --> RemoteCluster
                                // PGA(client) --> Airavata
                                JSch jsch = new JSch();
                                jsch.addIdentity(privateKeyPath, passPhrase);
                                Session session = jsch.getSession(userName, 
hostName, DEFAULT_SSH_PORT);
-                               SSHUtils.scpFrom(sourceURL.getPath(), 
inputPath, session);
+                               SSHUtils.scpFrom(sourceURI.getPath(), 
taskContext.getLocalWorkingDir() , session);
 
                                // Airavata --> RemoteCluster
-                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(),
 inputPath);
+                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURI.getPath(),
+                                               
taskContext.getLocalWorkingDir());
                        }
-               } catch (MalformedURLException e) {
-                       throw new TaskException("Wrong source or destination 
file path.", e);
                } catch (SSHApiException e) {
                        throw new TaskException("Scp attempt failed", e);
                } catch (JSchException | IOException e) {
                        throw new TaskException("Scp failed", e);
                } catch (TException e) {
                        throw new TaskException("Invalid task invocation");
+               } catch (URISyntaxException e) {
+                       e.printStackTrace();
                }
                return null;
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 08f8423..9ecd700 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -24,6 +24,7 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.config.ResourceConfig;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
@@ -47,6 +48,7 @@ import javax.mail.Session;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -71,12 +73,15 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
     private String host, emailAddress, password, storeProtocol, folderName ;
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new 
HashMap<ResourceJobManagerType, EmailParser>();
+       private Map<String, ResourceJobManagerType> addressMap = new 
HashMap<>();
 
-    public EmailBasedMonitor(ResourceJobManagerType type) throws 
AiravataException {
-        init();
-    }
 
-    private void init() throws AiravataException {
+       public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> 
resourceConfigs) throws AiravataException {
+               init();
+               populateAddressAndParserMap(resourceConfigs);
+       }
+
+       private void init() throws AiravataException {
         host = ServerSettings.getEmailBasedMonitorHost();
         emailAddress = ServerSettings.getEmailBasedMonitorAddress();
         password = ServerSettings.getEmailBasedMonitorPassword();
@@ -90,6 +95,24 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
         properties.put("mail.store.protocol", storeProtocol);
     }
 
+       private void populateAddressAndParserMap(Map<ResourceJobManagerType, 
ResourceConfig> resourceConfigs) throws AiravataException {
+               for (Map.Entry<ResourceJobManagerType, ResourceConfig> 
resourceConfigEntry : resourceConfigs.entrySet()) {
+                       ResourceJobManagerType type = 
resourceConfigEntry.getKey();
+                       ResourceConfig config = resourceConfigEntry.getValue();
+                       List<String> resourceEmailAddresses = 
config.getResourceEmailAddresses();
+                       for (String resourceEmailAddress : 
resourceEmailAddresses) {
+                               addressMap.put(resourceEmailAddress, type);
+                       }
+                       try {
+                               Class<? extends EmailParser> emailParserClass = 
Class.forName(config.getEmailParser()).asSubclass(EmailParser.class);
+                               EmailParser emailParser = 
emailParserClass.getConstructor().newInstance();
+                               emailParserMap.put(type, emailParser);
+                       } catch (Exception e) {
+                               throw new AiravataException("Error while 
instantiation email parsers", e);
+                       }
+               }
+
+       }
        @Override
        public void monitor(String jobId, ProcessContext processContext) {
                log.info("[EJM]: Added monitor Id : " + jobId + " to email 
based monitor map");
@@ -106,52 +129,21 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
         String addressStr = fromAddress.toString();
         ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
         EmailParser emailParser = emailParserMap.get(jobMonitorType);
-        if (emailParser == null) {
-            switch (jobMonitorType) {
-                case PBS:
-                    emailParser = new PBSEmailParser();
-                    break;
-                case SLURM:
-                    emailParser = new SLURMEmailParser();
-                    break;
-                case LSF:
-                    emailParser = new LSFEmailParser();
-                    break;
-                case UGE:
-                    emailParser = new UGEEmailParser();
-                    break;
-                default:
-                       throw new AiravataException("[EJM]: Un-handle resource 
job manager type: " + jobMonitorType
-                                       .toString() + " for email monitoring 
-->  " + addressStr);
-            }
-
-            emailParserMap.put(jobMonitorType, emailParser);
-        }
+           if (emailParser == null) {
+                   throw new AiravataException("[EJM]: Un-handle resource job 
manager type: " + jobMonitorType
+                                   .toString() + " for email monitoring -->  " 
+ addressStr);
+           }
         return emailParser.parseEmail(message);
     }
 
     private ResourceJobManagerType getJobMonitorType(String addressStr) throws 
AiravataException {
-        System.out.println("*********** address ******** : " + addressStr);
-        switch (addressStr) {
-            case "[email protected]":   // trestles , gordan
-            case "[email protected]":  // bigred2
-            case "root <[email protected]>": // bigred2
-            case "root <[email protected]>": // alamo
-                return ResourceJobManagerType.PBS;
-            case "SDSC Admin <[email protected]>": // comet
-            case "[email protected]": // stampede
-            case "slurm user <[email protected]>":
-                return ResourceJobManagerType.SLURM;
-//            case "lsf":
-//                return ResourceJobManagerType.LSF;
-            default:
-                if (addressStr.contains("ls4.tacc.utexas.edu>")) { // lonestar
-                    return ResourceJobManagerType.UGE;
-                } else {
-                    throw new AiravataException("[EJM]: Couldn't identify 
Resource job manager type from address " + addressStr);
-                }
-        }
-
+//        System.out.println("*********** address ******** : " + addressStr);
+           for (Map.Entry<String, ResourceJobManagerType> addressEntry : 
addressMap.entrySet()) {
+                   if (addressEntry.getKey().matches(addressStr)) {
+                           return addressEntry.getValue();
+                   }
+           }
+           throw new AiravataException("[EJM]: Couldn't identify Resource job 
manager type from address " + addressStr);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index cda910e..6fa1288 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -80,6 +80,7 @@ public class GfacServerHandler implements GfacService.Iface {
 
     public GfacServerHandler() throws AiravataStartupException {
         try {
+               Factory.loadConfiguration();
             startCuratorClient();
             initZkDataStructure();
             initAMQPClient();

Reply via email to