Repository: airavata
Updated Branches:
  refs/heads/develop f66043474 -> 6e5d1c6ee


User user compute resource preference if user provided


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6e5d1c6e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6e5d1c6e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6e5d1c6e

Branch: refs/heads/develop
Commit: 6e5d1c6eee8cc6772ca4381da2ec5d2a8ac58a7d
Parents: f660434
Author: Shameera Rathnayaka <[email protected]>
Authored: Fri Nov 11 19:40:12 2016 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Fri Nov 11 19:40:12 2016 -0500

----------------------------------------------------------------------
 .../core/utils/OrchestratorUtils.java           | 231 +++++++++++++------
 .../cpi/impl/SimpleOrchestratorImpl.java        | 146 +++++++-----
 2 files changed, 250 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 83c9273..61f7188 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -23,6 +23,7 @@ package org.apache.airavata.orchestrator.core.utils;
 import java.io.IOException;
 import java.util.*;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -31,11 +32,13 @@ import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterfa
 import 
org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import 
org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
 import org.apache.airavata.model.data.movement.DataMovementInterface;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
 import org.apache.airavata.model.data.movement.SCPDataMovement;
 import org.apache.airavata.model.data.movement.SecurityProtocol;
 import org.apache.airavata.model.process.ProcessModel;
+import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
@@ -50,69 +53,99 @@ import org.slf4j.LoggerFactory;
 public class OrchestratorUtils {
     private final static Logger logger = 
LoggerFactory.getLogger(OrchestratorUtils.class);
 
-    public static OrchestratorConfiguration loadOrchestratorConfiguration() 
throws OrchestratorException, IOException, NumberFormatException, 
ApplicationSettingsException {
+    public static OrchestratorConfiguration loadOrchestratorConfiguration()
+            throws OrchestratorException, IOException, NumberFormatException, 
ApplicationSettingsException {
+
         OrchestratorConfiguration orchestratorConfiguration = new 
OrchestratorConfiguration();
-        
orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) 
ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL)));
-        orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) 
ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE)));
-        
orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER)));
-        
orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE)));
-        
orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION)));
+        orchestratorConfiguration.setSubmitterInterval(
+                
Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL)));
+        orchestratorConfiguration.setThreadPoolSize(
+                
Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE)));
+        orchestratorConfiguration.setStartSubmitter(
+                
Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER)));
+        orchestratorConfiguration.setEmbeddedMode(
+                
Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE)));
+        orchestratorConfiguration.setEnableValidation(
+                
Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION)));
         if (orchestratorConfiguration.isEnableValidation()) {
-            
orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(",")));
+            orchestratorConfiguration.setValidatorClasses(
+                    
Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(",")));
         }
         return orchestratorConfiguration;
     }
 
-    public static JobSubmissionProtocol 
getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel 
model, String gatewayId) throws RegistryException {
+    public static JobSubmissionProtocol 
getPreferredJobSubmissionProtocol(OrchestratorContext context,
+                                                                          
ProcessModel model,
+                                                                          
String gatewayId) throws RegistryException {
         try {
-            GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
             String resourceHostId = model.getComputeResourceId();
-            ComputeResourcePreference preference = 
gatewayProfile.getComputeResourcePreference(gatewayId
-                    , resourceHostId);
-            return preference.getPreferredJobSubmissionProtocol();
+            return getComputeResourcePreference(context, gatewayId, 
resourceHostId).getPreferredJobSubmissionProtocol();
         } catch (AppCatalogException e) {
             logger.error("Error occurred while initializing app catalog", e);
             throw new RegistryException("Error occurred while initializing app 
catalog", e);
         }
     }
 
-    public static String getApplicationInterfaceName(OrchestratorContext 
context, ProcessModel model) throws RegistryException {
+    public static ComputeResourcePreference 
getComputeResourcePreference(OrchestratorContext context,
+                                                                         
String gatewayId,
+                                                                         
String resourceHostId)
+            throws AppCatalogException, RegistryException {
+
+        GwyResourceProfile gatewayProfile = getGatewayProfile(context);
+        return gatewayProfile.getComputeResourcePreference(gatewayId
+                , resourceHostId);
+    }
+
+    public static GwyResourceProfile getGatewayProfile(OrchestratorContext 
context)
+            throws AppCatalogException, RegistryException {
+        return context.getRegistry().getAppCatalog().getGatewayProfile();
+    }
+
+    public static UsrResourceProfile 
getUserResourceProfile(OrchestratorContext context)
+            throws RegistryException, AppCatalogException {
+        return context.getRegistry().getAppCatalog().getUserResourceProfile();
+    }
+
+    public static String getApplicationInterfaceName(OrchestratorContext 
context, ProcessModel model)
+            throws RegistryException {
         try {
             ApplicationInterface applicationInterface = 
context.getRegistry().getAppCatalog().getApplicationInterface();
-            ApplicationInterfaceDescription appInterface = 
applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
+            ApplicationInterfaceDescription appInterface =
+                    
applicationInterface.getApplicationInterface(model.getApplicationInterfaceId());
             return appInterface.getApplicationName();
         } catch (AppCatalogException e) {
             throw new RegistryException("Error while retrieving application 
interface", e);
         }
     }
 
-    public static DataMovementProtocol 
getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel 
model, String gatewayId) throws RegistryException {
+    public static DataMovementProtocol 
getPreferredDataMovementProtocol(OrchestratorContext context,
+                                                                        
ProcessModel model,
+                                                                        String 
gatewayId) throws RegistryException {
         try {
-            GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
             String resourceHostId = model.getComputeResourceId();
-            ComputeResourcePreference preference = 
gatewayProfile.getComputeResourcePreference(gatewayId
-                    , resourceHostId);
-            return preference.getPreferredDataMovementProtocol();
+            return getComputeResourcePreference(context, gatewayId, 
resourceHostId).getPreferredDataMovementProtocol();
         } catch (AppCatalogException e) {
             logger.error("Error occurred while initializing app catalog", e);
             throw new RegistryException("Error occurred while initializing app 
catalog", e);
         }
     }
 
-    public static ComputeResourcePreference 
getComputeResourcePreference(OrchestratorContext context, ProcessModel 
processModel, String gatewayId) throws RegistryException {
+    public static ComputeResourcePreference 
getComputeResourcePreference(OrchestratorContext context,
+                                                                         
ProcessModel processModel,
+                                                                         
String gatewayId) throws RegistryException {
         try {
-            GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
-            String resourceHostId = processModel.getComputeResourceId();
-            return gatewayProfile.getComputeResourcePreference(gatewayId, 
resourceHostId);
+            return getComputeResourcePreference(context, gatewayId, 
processModel.getComputeResourceId());
         } catch (AppCatalogException e) {
             logger.error("Error occurred while initializing app catalog", e);
             throw new RegistryException("Error occurred while initializing app 
catalog", e);
         }
     }
 
-    public static StoragePreference getStoragePreference(OrchestratorContext 
context, ProcessModel processModel, String gatewayId) throws RegistryException {
+    public static StoragePreference getStoragePreference(OrchestratorContext 
context,
+                                                         ProcessModel 
processModel,
+                                                         String gatewayId) 
throws RegistryException {
         try {
-            GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
+            GwyResourceProfile gatewayProfile = getGatewayProfile(context);
             String resourceHostId = processModel.getComputeResourceId();
             return gatewayProfile.getStoragePreference(gatewayId, 
resourceHostId);
         } catch (AppCatalogException e) {
@@ -121,46 +154,103 @@ public class OrchestratorUtils {
         }
     }
 
-    public static String getLoginUserName(OrchestratorContext context, 
ProcessModel processModel, String gatewayId) throws RegistryException {
+    public static String getLoginUserName(OrchestratorContext context,
+                                          ProcessModel processModel,
+                                          String gatewayId) throws 
RegistryException, AiravataException {
         try {
-            String loginUserName = null;
-            String overrideLoginUserName = 
processModel.getProcessResourceSchedule().getOverrideLoginUserName();
-            if (overrideLoginUserName != null && 
!overrideLoginUserName.equals("")) {
-                loginUserName = overrideLoginUserName;
-            } else {
-                GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
-                loginUserName = 
gatewayProfile.getComputeResourcePreference(gatewayId, 
processModel.getComputeResourceId()).getLoginUserName();
+            ComputeResourcePreference computeResourcePreference = 
getComputeResourcePreference(context, gatewayId,
+                    processModel.getComputeResourceId());
+            ComputationalResourceSchedulingModel processResourceSchedule = 
processModel.getProcessResourceSchedule();
+            if (processModel.isUseUserCRPref()) {
+                UsrResourceProfile userResourceProfile = 
getUserResourceProfile(context);
+                UserComputeResourcePreference userComputeResourcePreference = 
userResourceProfile
+                        
.getUserComputeResourcePreference(processModel.getUserName(), gatewayId,
+                                processModel.getComputeResourceId());
+                if (isValid(userComputeResourcePreference.getLoginUserName())) 
{
+                    return userComputeResourcePreference.getLoginUserName();
+                } else if 
(isValid(processResourceSchedule.getOverrideLoginUserName())) {
+                    logger.warn("User computer resource preference doesn't 
have valid user login name, using computer " +
+                            "resource scheduling login name " +  
processResourceSchedule.getOverrideLoginUserName());
+                    return processResourceSchedule.getOverrideLoginUserName();
+                } else if 
(isValid(computeResourcePreference.getLoginUserName())) {
+                    logger.warn("Either User computer resource preference or 
computer resource scheduling " +
+                            "doesn't have valid user login name, using  
gateway computer resource preference login name "
+                            +  computeResourcePreference.getLoginUserName());
+                    return computeResourcePreference.getLoginUserName();
+                }else {
+                    throw new AiravataException("Login name is not found");
+                }
+            }else {
+                if 
(isValid(processResourceSchedule.getOverrideLoginUserName())) {
+                    return processResourceSchedule.getOverrideLoginUserName();
+                } else if 
(isValid(computeResourcePreference.getLoginUserName())) {
+                    logger.warn("Process compute resource scheduling doesn't 
have valid user login name, " +
+                            "using  gateway computer resource preference login 
name "
+                            + computeResourcePreference.getLoginUserName());
+                    return computeResourcePreference.getLoginUserName();
+                }else {
+                    throw new AiravataException("Login name is not found");
+                }
             }
-            return loginUserName;
         } catch (AppCatalogException e) {
             logger.error("Error occurred while initializing app catalog to 
fetch login username", e);
             throw new RegistryException("Error occurred while initializing app 
catalog to fetch login username", e);
         }
     }
 
-    public static String getScratchLocation(OrchestratorContext context, 
ProcessModel processModel, String gatewayId) throws RegistryException {
+    public static String getScratchLocation(OrchestratorContext context,
+                                            ProcessModel processModel,
+                                            String gatewayId) throws 
RegistryException, AiravataException {
         try {
-            String scratchLocation = null;
-            String overrideScratchLocation = 
processModel.getProcessResourceSchedule().getOverrideScratchLocation();
-            if (overrideScratchLocation != null && 
!overrideScratchLocation.equals("")) {
-                scratchLocation = overrideScratchLocation;
-            } else {
-                GwyResourceProfile gatewayProfile = 
context.getRegistry().getAppCatalog().getGatewayProfile();
-                scratchLocation = 
gatewayProfile.getComputeResourcePreference(gatewayId, 
processModel.getComputeResourceId()).getScratchLocation();
+            ComputeResourcePreference computeResourcePreference = 
getComputeResourcePreference(context, gatewayId,
+                    processModel.getComputeResourceId());
+            ComputationalResourceSchedulingModel processResourceSchedule = 
processModel.getProcessResourceSchedule();
+            if (processModel.isUseUserCRPref()) {
+                UsrResourceProfile userResourceProfile = 
getUserResourceProfile(context);
+                UserComputeResourcePreference userComputeResourcePreference = 
userResourceProfile
+                        
.getUserComputeResourcePreference(processModel.getUserName(), gatewayId,
+                                processModel.getComputeResourceId());
+                if 
(isValid(userComputeResourcePreference.getScratchLocation())) {
+                    return userComputeResourcePreference.getScratchLocation();
+                } else if 
(isValid(processResourceSchedule.getOverrideScratchLocation())) {
+                    logger.warn("User computer resource preference doesn't 
have valid scratch location, using computer " +
+                            "resource scheduling scratch location " +  
processResourceSchedule.getOverrideScratchLocation());
+                    return 
processResourceSchedule.getOverrideScratchLocation();
+                } else if 
(isValid(computeResourcePreference.getScratchLocation())) {
+                    logger.warn("Either User computer resource preference or 
computer resource scheduling doesn't have " +
+                            "valid scratch location, using  gateway computer 
resource preference scratch location"
+                            +  computeResourcePreference.getScratchLocation());
+                    return computeResourcePreference.getScratchLocation();
+                }else {
+                    throw new AiravataException("Scratch location is not 
found");
+                }
+            }else {
+                if 
(isValid(processResourceSchedule.getOverrideScratchLocation())) {
+                    return 
processResourceSchedule.getOverrideScratchLocation();
+                } else if 
(isValid(computeResourcePreference.getScratchLocation())) {
+                    logger.warn("Process compute resource scheduling doesn't 
have valid scratch location, " +
+                            "using  gateway computer resource preference 
scratch location"
+                            + computeResourcePreference.getScratchLocation());
+                    return computeResourcePreference.getScratchLocation();
+                }else {
+                    throw new AiravataException("Scratch location is not 
found");
+                }
             }
-            return scratchLocation;
         } catch (AppCatalogException e) {
             logger.error("Error occurred while initializing app catalog to 
fetch scratch location", e);
             throw new RegistryException("Error occurred while initializing app 
catalog to fetch scratch location", e);
         }
     }
 
-    public static JobSubmissionInterface 
getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel 
processModel, String gatewayId) throws RegistryException {
+    public static JobSubmissionInterface 
getPreferredJobSubmissionInterface(OrchestratorContext context,
+                                                                            
ProcessModel processModel,
+                                                                            
String gatewayId) throws RegistryException {
         try {
             String resourceHostId = processModel.getComputeResourceId();
             ComputeResourcePreference resourcePreference = 
getComputeResourcePreference(context, processModel, gatewayId);
             JobSubmissionProtocol preferredJobSubmissionProtocol = 
resourcePreference.getPreferredJobSubmissionProtocol();
-            ComputeResourceDescription resourceDescription = 
context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            ComputeResourceDescription resourceDescription =
+                    
context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
             List<JobSubmissionInterface> jobSubmissionInterfaces = 
resourceDescription.getJobSubmissionInterfaces();
             Map<JobSubmissionProtocol, List<JobSubmissionInterface>> 
orderedInterfaces = new HashMap<>();
             List<JobSubmissionInterface> interfaces = new ArrayList<>();
@@ -178,21 +268,14 @@ public class OrchestratorUtils {
                             }
                         }
                     }else {
-                        Collections.sort(jobSubmissionInterfaces, new 
Comparator<JobSubmissionInterface>() {
-                            @Override
-                            public int compare(JobSubmissionInterface 
jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
-                                return 
jobSubmissionInterface.getPriorityOrder() - 
jobSubmissionInterface2.getPriorityOrder();
-                            }
-                        });
+                        Collections.sort(jobSubmissionInterfaces,
+                                (jobSubmissionInterface, 
jobSubmissionInterface2) ->
+                                        
jobSubmissionInterface.getPriorityOrder() - 
jobSubmissionInterface2.getPriorityOrder());
                     }
                 }
                 interfaces = 
orderedInterfaces.get(preferredJobSubmissionProtocol);
-                Collections.sort(interfaces, new 
Comparator<JobSubmissionInterface>() {
-                    @Override
-                    public int compare(JobSubmissionInterface 
jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
-                        return jobSubmissionInterface.getPriorityOrder() - 
jobSubmissionInterface2.getPriorityOrder();
-                    }
-                });
+                Collections.sort(interfaces, (jobSubmissionInterface, 
jobSubmissionInterface2) ->
+                        jobSubmissionInterface.getPriorityOrder() - 
jobSubmissionInterface2.getPriorityOrder());
             } else {
                 throw new RegistryException("Compute resource should have at 
least one job submission interface defined...");
             }
@@ -202,12 +285,15 @@ public class OrchestratorUtils {
         }
     }
 
-    public static DataMovementInterface 
getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel 
processModel, String gatewayId) throws RegistryException {
+    public static DataMovementInterface 
getPrefferredDataMovementInterface(OrchestratorContext context,
+                                                                           
ProcessModel processModel,
+                                                                           
String gatewayId) throws RegistryException {
         try {
             String resourceHostId = processModel.getComputeResourceId();
             ComputeResourcePreference resourcePreference = 
getComputeResourcePreference(context, processModel, gatewayId);
             DataMovementProtocol preferredDataMovementProtocol = 
resourcePreference.getPreferredDataMovementProtocol();
-            ComputeResourceDescription resourceDescription = 
context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+            ComputeResourceDescription resourceDescription =
+                    
context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
             List<DataMovementInterface> dataMovementInterfaces = 
resourceDescription.getDataMovementInterfaces();
             if (dataMovementInterfaces != null && 
!dataMovementInterfaces.isEmpty()) {
                 for (DataMovementInterface dataMovementInterface : 
dataMovementInterfaces){
@@ -226,7 +312,9 @@ public class OrchestratorUtils {
         return null;
     }
 
-    public static int getDataMovementPort(OrchestratorContext context, 
ProcessModel processModel, String gatewayId) throws RegistryException{
+    public static int getDataMovementPort(OrchestratorContext context,
+                                          ProcessModel processModel,
+                                          String gatewayId) throws 
RegistryException{
         try {
             DataMovementProtocol protocol = 
getPreferredDataMovementProtocol(context, processModel, gatewayId);
             DataMovementInterface dataMovementInterface = 
getPrefferredDataMovementInterface(context, processModel, gatewayId);
@@ -243,7 +331,9 @@ public class OrchestratorUtils {
     }
 
 
-    public static SecurityProtocol getSecurityProtocol(OrchestratorContext 
context, ProcessModel processModel, String gatewayId) throws RegistryException{
+    public static SecurityProtocol getSecurityProtocol(OrchestratorContext 
context,
+                                                       ProcessModel 
processModel,
+                                                       String gatewayId) 
throws RegistryException{
         try {
             JobSubmissionProtocol submissionProtocol = 
getPreferredJobSubmissionProtocol(context, processModel, gatewayId);
             JobSubmissionInterface jobSubmissionInterface = 
getPreferredJobSubmissionInterface(context, processModel, gatewayId);
@@ -274,7 +364,8 @@ public class OrchestratorUtils {
         return null;
     }
 
-    public static LOCALSubmission getLocalJobSubmission(OrchestratorContext 
context, String submissionId) throws RegistryException {
+    public static LOCALSubmission getLocalJobSubmission(OrchestratorContext 
context,
+                                                        String submissionId) 
throws RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();
             return 
appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
@@ -285,7 +376,8 @@ public class OrchestratorUtils {
         }
     }
 
-    public static UnicoreJobSubmission 
getUnicoreJobSubmission(OrchestratorContext context, String submissionId) 
throws RegistryException {
+    public static UnicoreJobSubmission 
getUnicoreJobSubmission(OrchestratorContext context,
+                                                               String 
submissionId) throws RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();
             return 
appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
@@ -296,7 +388,8 @@ public class OrchestratorUtils {
         }
     }
 
-    public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext 
context, String submissionId) throws RegistryException {
+    public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext 
context,
+                                                       String submissionId) 
throws RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();
             return 
appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
@@ -307,7 +400,8 @@ public class OrchestratorUtils {
         }
     }
 
-    public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext 
context, String submissionId) throws RegistryException {
+    public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext 
context,
+                                                           String 
submissionId) throws RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();
             return 
appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
@@ -318,7 +412,8 @@ public class OrchestratorUtils {
         }
     }
 
-    public static SCPDataMovement getSCPDataMovement(OrchestratorContext 
context, String dataMoveId) throws RegistryException {
+    public static SCPDataMovement getSCPDataMovement(OrchestratorContext 
context,
+                                                     String dataMoveId) throws 
RegistryException {
         try {
             AppCatalog appCatalog = context.getRegistry().getAppCatalog();
             return 
appCatalog.getComputeResource().getSCPDataMovement(dataMoveId);
@@ -328,4 +423,8 @@ public class OrchestratorUtils {
             throw new RegistryException(errorMsg, e);
         }
     }
+
+    private static boolean isValid(String str) {
+        return (str != null && str.trim().isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b97e79a..19a3521 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.orchestrator.cpi.impl;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.task.TaskException;
@@ -93,15 +94,18 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         }
     }
 
-    public ValidationResults validateExperiment(ExperimentModel experiment) 
throws OrchestratorException,LaunchValidationException {
-        org.apache.airavata.model.error.ValidationResults validationResults = 
new org.apache.airavata.model.error.ValidationResults();
+    public ValidationResults validateExperiment(ExperimentModel experiment)
+            throws OrchestratorException,LaunchValidationException {
+        org.apache.airavata.model.error.ValidationResults validationResults =
+                new org.apache.airavata.model.error.ValidationResults();
         validationResults.setValidationState(true); // initially making it to 
success, if atleast one failed them simply mark it failed.
         String errorMsg = "Validation Errors : ";
         if (this.orchestratorConfiguration.isEnableValidation()) {
             List<String> validatorClasses = 
this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses();
             for (String validator : validatorClasses) {
                 try {
-                    Class<? extends JobMetadataValidator> vClass = 
Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
+                    Class<? extends JobMetadataValidator> vClass =
+                            
Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class);
                     JobMetadataValidator jobMetadataValidator = 
vClass.newInstance();
                     validationResults = 
jobMetadataValidator.validate(experiment, null);
                     if (validationResults.isValidationState()) {
@@ -116,14 +120,15 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                                 }
                             }
                         }
-                        logger.error("Validation of " + validator + " for 
experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + 
errorMsg);
+                        logger.error("Validation of " + validator + " for 
experiment Id " +
+                                experiment.getExperimentId() + " is 
FAILED:[error]. " + errorMsg);
                         validationResults.setValidationState(false);
                         try {
                             ErrorModel details = new ErrorModel();
                             details.setActualErrorMessage(errorMsg);
                             
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-                            
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR,
 details,
-                                    experiment.getExperimentId());
+                            
orchestratorContext.getRegistry().getExperimentCatalog()
+                                    .add(ExpCatChildDataType.EXPERIMENT_ERROR, 
details, experiment.getExperimentId());
                         } catch (RegistryException e) {
                             logger.error("Error while saving error details to 
registry", e);
                         }
@@ -147,12 +152,15 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
             //atleast one validation has failed, so we throw an exception
             LaunchValidationException launchValidationException = new 
LaunchValidationException();
             launchValidationException.setValidationResult(validationResults);
-            launchValidationException.setErrorMessage("Validation failed refer 
the validationResults list for detail error. Validation errors : " + errorMsg);
+            launchValidationException.setErrorMessage("Validation failed refer 
the validationResults list for " +
+                    "detail error. Validation errors : " + errorMsg);
             throw launchValidationException;
         }
     }
 
-    public ValidationResults validateProcess(ExperimentModel experiment, 
ProcessModel processModel) throws 
OrchestratorException,LaunchValidationException {
+    public ValidationResults validateProcess(ExperimentModel experiment, 
ProcessModel processModel)
+            throws OrchestratorException, LaunchValidationException {
+
         org.apache.airavata.model.error.ValidationResults validationResults = 
new org.apache.airavata.model.error.ValidationResults();
         validationResults.setValidationState(true); // initially making it to 
success, if atleast one failed them simply mark it failed.
         String errorMsg = "Validation Errors : ";
@@ -167,46 +175,42 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                         logger.info("Validation of " + validator + " is 
SUCCESSFUL");
                     } else {
                         List<ValidatorResult> validationResultList = 
validationResults.getValidationResultList();
-                        for (ValidatorResult result : validationResultList){
-                            if (!result.isResult()){
+                        for (ValidatorResult result : validationResultList) {
+                            if (!result.isResult()) {
                                 String validationError = 
result.getErrorDetails();
-                                if (validationError != null){
+                                if (validationError != null) {
                                     errorMsg += validationError + " ";
                                 }
                             }
                         }
-                        logger.error("Validation of " + validator + " for 
experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + 
errorMsg);
+                        logger.error("Validation of " + validator + " for 
experiment Id " +
+                                experiment.getExperimentId() + " is 
FAILED:[error]. " + errorMsg);
                         validationResults.setValidationState(false);
                         try {
                             ErrorModel details = new ErrorModel();
                             details.setActualErrorMessage(errorMsg);
                             
details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-                            
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR,
 details,
-                                    processModel.getProcessId());
+                            
orchestratorContext.getRegistry().getExperimentCatalog()
+                                    .add(ExpCatChildDataType.PROCESS_ERROR, 
details, processModel.getProcessId());
                         } catch (RegistryException e) {
                             logger.error("Error while saving error details to 
registry", e);
                         }
                         break;
                     }
-                } catch (ClassNotFoundException e) {
-                    logger.error("Error loading the validation class: ", 
validator, e);
-                    validationResults.setValidationState(false);
-                } catch (InstantiationException e) {
-                    logger.error("Error loading the validation class: ", 
validator, e);
-                    validationResults.setValidationState(false);
-                } catch (IllegalAccessException e) {
+                } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
                     logger.error("Error loading the validation class: ", 
validator, e);
                     validationResults.setValidationState(false);
                 }
             }
         }
-        if(validationResults.isValidationState()){
+        if (validationResults.isValidationState()) {
             return validationResults;
-        }else {
+        } else {
             //atleast one validation has failed, so we throw an exception
             LaunchValidationException launchValidationException = new 
LaunchValidationException();
             launchValidationException.setValidationResult(validationResults);
-            launchValidationException.setErrorMessage("Validation failed refer 
the validationResults list for detail error. Validation errors : " + errorMsg);
+            launchValidationException.setErrorMessage("Validation failed refer 
the validationResults " +
+                    "list for detail error. Validation errors : " + errorMsg);
             throw launchValidationException;
         }
     }
@@ -253,7 +257,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         try {
             Registry registry = orchestratorContext.getRegistry();
             ExperimentModel experimentModel = 
(ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT,
 experimentId);
-            List<Object> processList = 
registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, 
Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
+            List<Object> processList = registry.getExperimentCatalog()
+                    .get(ExperimentCatalogModelType.PROCESS, 
Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId);
             if (processList != null && !processList.isEmpty()) {
                 for (Object processObject : processList) {
                     ProcessModel processModel = (ProcessModel)processObject;
@@ -283,8 +288,10 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                 throw new OrchestratorException("Compute Resource Id cannot be 
null at this point");
             }
             ComputeResourceDescription computeResource = 
appCatalog.getComputeResource().getComputeResource(resourceHostId);
-            JobSubmissionInterface preferredJobSubmissionInterface = 
OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, 
processModel, gatewayId);
-            ComputeResourcePreference resourcePreference = 
OrchestratorUtils.getComputeResourcePreference(orchestratorContext, 
processModel, gatewayId);
+            JobSubmissionInterface preferredJobSubmissionInterface =
+                    
OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, 
processModel, gatewayId);
+            ComputeResourcePreference resourcePreference =
+                    
OrchestratorUtils.getComputeResourcePreference(orchestratorContext, 
processModel, gatewayId);
             List<String> taskIdList = new ArrayList<>();
 
             if (resourcePreference.getPreferredJobSubmissionProtocol() == 
JobSubmissionProtocol.UNICORE) {
@@ -303,19 +310,22 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                                 // need to create more job submissions
                                 int numOfMaxWallTimeJobs = ((int) 
Math.floor(userGivenWallTime / maxRunTime));
                                 for (int i = 1; i <= numOfMaxWallTimeJobs; 
i++) {
-                                    
taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
 processModel, maxRunTime));
+                                    taskIdList.addAll(
+                                            
createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, 
processModel, maxRunTime));
                                 }
                                 int leftWallTime = userGivenWallTime % 
maxRunTime;
                                 if (leftWallTime != 0) {
-                                    
taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
 processModel, leftWallTime));
+                                    taskIdList.addAll(
+                                            
createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, 
processModel, leftWallTime));
                                 }
                             } else {
-                                
taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
 processModel, userGivenWallTime));
+                                taskIdList.addAll(
+                                        
createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, 
processModel, userGivenWallTime));
                             }
                         }
                     }
                 } else {
-                    
taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface,
 processModel, userGivenWallTime));
+                    taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, 
preferredJobSubmissionInterface, processModel, userGivenWallTime));
                 }
                 
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
             }
@@ -342,7 +352,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
     private List<String> createAndSaveEnvSetupTask(String gatewayId,
                                                    ProcessModel processModel,
                                                    ExperimentCatalog 
experimentCatalog)
-            throws RegistryException, TException {
+            throws RegistryException, TException, AiravataException {
         List<String> envTaskIds = new ArrayList<>();
         TaskModel envSetupTask = new TaskModel();
         envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
@@ -363,7 +373,9 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         return envTaskIds;
     }
 
-    public List<String> createAndSaveInputDataStagingTasks(ProcessModel 
processModel, String gatewayId) throws RegistryException {
+    public List<String> createAndSaveInputDataStagingTasks(ProcessModel 
processModel, String gatewayId)
+            throws RegistryException, AiravataException {
+
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<InputDataObjectType> processInputs = 
processModel.getProcessInputs();
 
@@ -380,8 +392,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                     case URI_COLLECTION:
                         try {
                             TaskModel inputDataStagingTask = 
getInputDataStagingTask(processModel, processInput, gatewayId);
-                            String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
 inputDataStagingTask,
-                                    processModel.getProcessId());
+                            String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog()
+                                    .add(ExpCatChildDataType.TASK, 
inputDataStagingTask, processModel.getProcessId());
                             inputDataStagingTask.setTaskId(taskId);
                             
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
                         } catch (TException | AppCatalogException | 
TaskException e) {
@@ -397,7 +409,9 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         return dataStagingTaskIds;
     }
 
-    public List<String> createAndSaveOutputDataStagingTasks(ProcessModel 
processModel, String gatewayId) throws RegistryException {
+    public List<String> createAndSaveOutputDataStagingTasks(ProcessModel 
processModel, String gatewayId)
+            throws RegistryException, AiravataException {
+
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<OutputDataObjectType> processOutputs = 
processModel.getProcessOutputs();
         String appName = 
OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, 
processModel);
@@ -405,14 +419,14 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
             for (OutputDataObjectType processOutput : processOutputs) {
                 DataType type = processOutput.getType();
                 switch (type) {
-                    case STDOUT :
-                        if(null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()){
+                    case STDOUT:
+                        if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                             processOutput.setValue(appName + ".stdout");
                         }
                         createOutputDataSatagingTasks(processModel, gatewayId, 
dataStagingTaskIds, processOutput);
                         break;
                     case STDERR:
-                        if(null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()){
+                        if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                             processOutput.setValue(appName + ".stderr");
                         }
                         createOutputDataSatagingTasks(processModel, gatewayId, 
dataStagingTaskIds, processOutput);
@@ -439,29 +453,35 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
 
     private boolean isArchive(ProcessModel processModel, String gatewayId) 
throws AppCatalogException {
         AppCatalog appCatalog = RegistryFactory.getAppCatalog();
-        ApplicationInterfaceDescription appInterface = 
appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId());
+        ApplicationInterfaceDescription appInterface = 
appCatalog.getApplicationInterface()
+                
.getApplicationInterface(processModel.getApplicationInterfaceId());
         return appInterface.isArchiveWorkingDirectory();
     }
 
-    private void createArchiveDataStatgingTask(ProcessModel processModel, 
String gatewayId, List<String> dataStagingTaskIds) throws RegistryException {
+    private void createArchiveDataStatgingTask(ProcessModel processModel,
+                                               String gatewayId,
+                                               List<String> 
dataStagingTaskIds) throws RegistryException, AiravataException {
         TaskModel archiveTask = null;
         try {
             archiveTask = getOutputDataStagingTask(processModel, null, 
gatewayId);
         } catch (TException e) {
             throw new RegistryException("Error! DataStaging sub task 
serialization failed");
         }
-        String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
 archiveTask,
-                processModel.getProcessId());
+        String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog()
+                .add(ExpCatChildDataType.TASK, archiveTask, 
processModel.getProcessId());
         archiveTask.setTaskId(taskId);
         dataStagingTaskIds.add(archiveTask.getTaskId());
 
     }
 
-    private void createOutputDataSatagingTasks(ProcessModel processModel, 
String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType 
processOutput) throws RegistryException {
+    private void createOutputDataSatagingTasks(ProcessModel processModel,
+                                               String gatewayId,
+                                               List<String> dataStagingTaskIds,
+                                               OutputDataObjectType 
processOutput) throws RegistryException, AiravataException {
         try {
             TaskModel outputDataStagingTask = 
getOutputDataStagingTask(processModel, processOutput, gatewayId);
-            String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK,
 outputDataStagingTask,
-                    processModel.getProcessId());
+            String taskId = (String) 
orchestratorContext.getRegistry().getExperimentCatalog()
+                    .add(ExpCatChildDataType.TASK, outputDataStagingTask, 
processModel.getProcessId());
             outputDataStagingTask.setTaskId(taskId);
             dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
         } catch (TException e) {
@@ -469,7 +489,10 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         }
     }
 
-    private List<String> createAndSaveSubmissionTasks(String gatewayId, 
JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int 
wallTime)
+    private List<String> createAndSaveSubmissionTasks(String gatewayId,
+                                                      JobSubmissionInterface 
jobSubmissionInterface,
+                                                      ProcessModel 
processModel,
+                                                      int wallTime)
             throws TException, RegistryException, OrchestratorException {
 
         JobSubmissionProtocol jobSubmissionProtocol = 
jobSubmissionInterface.getJobSubmissionProtocol();
@@ -539,7 +562,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         });
     }
 
-    private TaskModel getInputDataStagingTask(ProcessModel processModel, 
InputDataObjectType processInput, String gatewayId) throws RegistryException, 
TException, AppCatalogException, TaskException {
+    private TaskModel getInputDataStagingTask(ProcessModel processModel, 
InputDataObjectType processInput, String gatewayId) throws RegistryException, 
TException, AppCatalogException, TaskException, AiravataException {
         // create new task model for this task
         TaskModel taskModel = new TaskModel();
         taskModel.setParentProcessId(processModel.getProcessId());
@@ -551,19 +574,20 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         taskModel.setTaskType(TaskTypes.DATA_STAGING);
         // create data staging sub task model
         DataStagingTaskModel submodel = new DataStagingTaskModel();
-        ComputeResourcePreference computeResourcePreference = 
OrchestratorUtils.getComputeResourcePreference(orchestratorContext, 
processModel, gatewayId);
-        ComputeResourceDescription computeResource = 
orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
-        String remoteOutputDir = 
OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, 
gatewayId) + File.separator + processModel.getProcessId();
-        remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : 
remoteOutputDir + "/";
+        ComputeResourceDescription computeResource = 
orchestratorContext.getRegistry().getAppCatalog()
+                
.getComputeResource().getComputeResource(processModel.getComputeResourceId());
+        String workingDir = 
OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, 
gatewayId) +
+                File.separator + processModel.getProcessId() + File.separator;
         URI destination = null;
         try {
-            DataMovementProtocol dataMovementProtocol = 
OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, 
processModel, gatewayId);
+            DataMovementProtocol dataMovementProtocol =
+                    
OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, 
processModel, gatewayId);
             String loginUserName = 
OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, 
gatewayId);
             destination = new URI(dataMovementProtocol.name(),
                     loginUserName,
                     computeResource.getHostName(),
                     OrchestratorUtils.getDataMovementPort(orchestratorContext, 
processModel, gatewayId),
-                    remoteOutputDir , null, null);
+                    workingDir , null, null);
         } catch (URISyntaxException e) {
             throw new TaskException("Error while constructing destination file 
URI");
         }
@@ -575,7 +599,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         return taskModel;
     }
 
-    private TaskModel getOutputDataStagingTask(ProcessModel processModel, 
OutputDataObjectType processOutput, String gatewayId) throws RegistryException, 
TException {
+    private TaskModel getOutputDataStagingTask(ProcessModel processModel, 
OutputDataObjectType processOutput, String gatewayId) throws RegistryException, 
TException, AiravataException {
         try {
 
             // create new task model for this task
@@ -587,11 +611,11 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
             
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             taskModel.setTaskStatuses(Arrays.asList(taskStatus));
             taskModel.setTaskType(TaskTypes.DATA_STAGING);
-            ComputeResourcePreference computeResourcePreference = 
OrchestratorUtils.getComputeResourcePreference(orchestratorContext, 
processModel, gatewayId);
-            ComputeResourceDescription computeResource = 
orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId());
+            ComputeResourceDescription computeResource = 
orchestratorContext.getRegistry().getAppCatalog()
+                    
.getComputeResource().getComputeResource(processModel.getComputeResourceId());
 
-            String remoteOutputDir = 
OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, 
gatewayId) + File.separator + processModel.getProcessId();
-            remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir 
: remoteOutputDir + "/";
+            String workingDir = 
OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, 
gatewayId)
+                    + File.separator + processModel.getProcessId() + 
File.separator;
             DataStagingTaskModel submodel = new DataStagingTaskModel();
             DataMovementProtocol dataMovementProtocol = 
OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, 
processModel, gatewayId);
             URI source = null;
@@ -604,7 +628,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                             loginUserName,
                             computeResource.getHostName(),
                             
OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, 
gatewayId),
-                            remoteOutputDir + processOutput.getValue(), null, 
null);
+                            workingDir + processOutput.getValue(), null, null);
                 } else {
                     // archive
                     submodel.setType(DataStageType.ARCHIVE_OUTPUT);
@@ -612,7 +636,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
                             loginUserName,
                             computeResource.getHostName(),
                             
OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, 
gatewayId),
-                            remoteOutputDir, null, null);
+                            workingDir, null, null);
                 }
             } catch (URISyntaxException e) {
                 throw new TaskException("Error while constructing source file 
URI");

Reply via email to