This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch airavata-aws
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/airavata-aws by this push:
     new 76004dc829 updated the simple orchestrator to support aws tasks 
creation, included the libs in distributions, and updated the participant with 
tasks
76004dc829 is described below

commit 76004dc8296e38a8bb70e6e9af3cc4b8c9bf9295
Author: lahiruj <[email protected]>
AuthorDate: Fri Jun 27 03:43:00 2025 -0400

    updated the simple orchestrator to support aws tasks creation, included the 
libs in distributions, and updated the participant with tasks
---
 modules/airavata-helix/helix-spectator/pom.xml     |  10 +
 .../helix/impl/participant/GlobalParticipant.java  |   4 +-
 .../airavata/helix/impl/task/AWSTaskFactory.java   |  15 +-
 .../helix/impl/task/aws/CreateEC2InstanceTask.java |   2 +
 .../helix/impl/task/aws/NoOperationTask.java       |  21 +
 .../src/main/assembly/participant-bin-assembly.xml |  13 +
 .../src/main/assembly/post-wm-bin-assembly.xml     |   9 +
 .../src/main/assembly/pre-wm-bin-assembly.xml      |   8 +
 .../cpi/impl/SimpleOrchestratorImpl.java           | 508 ++++++++++-----------
 .../AWSGroupComputeResourcePrefEntity.java         |   2 +
 .../group_resource_profile_model.thrift            |   9 +-
 11 files changed, 318 insertions(+), 283 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/pom.xml 
b/modules/airavata-helix/helix-spectator/pom.xml
index d6e4b060aa..e00e61cfb1 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -90,6 +90,16 @@ under the License.
             <artifactId>ec2</artifactId>
             <version>2.31.70</version>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>auth</artifactId>
+            <version>2.31.70</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>retries</artifactId>
+            <version>2.31.70</version>
+        </dependency>
         <dependency>
             <groupId>com.github.docker-java</groupId>
             <artifactId>docker-java</artifactId>
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index eb1cdedc87..7e4df17034 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -48,7 +48,9 @@ public class GlobalParticipant extends 
HelixParticipant<AbstractTask> {
         "org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask",
         "org.apache.airavata.helix.impl.task.parsing.DataParsingTask",
         "org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask",
-        "org.apache.airavata.helix.impl.task.mock.MockTask"
+        "org.apache.airavata.helix.impl.task.mock.MockTask",
+        "org.apache.airavata.helix.impl.task.aws.CreateEC2InstanceTask",
+        "org.apache.airavata.helix.impl.task.aws.NoOperationTask"
     };
 
     @SuppressWarnings("WeakerAccess")
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
index a121d50e5a..75a466b03b 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
@@ -19,6 +19,7 @@
 package org.apache.airavata.helix.impl.task;
 
 import org.apache.airavata.helix.impl.task.aws.CreateEC2InstanceTask;
+import org.apache.airavata.helix.impl.task.aws.NoOperationTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,36 +35,36 @@ public class AWSTaskFactory implements HelixTaskFactory {
 
     @Override
     public AiravataTask createInputDataStagingTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createJobSubmissionTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createOutputDataStagingTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createArchiveTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createJobVerificationTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createCompletingTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 
     @Override
     public AiravataTask createParsingTriggeringTask(String processId) {
-        return null;
+        return new NoOperationTask();
     }
 }
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
index 303e7051e5..fd9fe63f16 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
@@ -22,6 +22,7 @@ import org.apache.airavata.agents.api.AgentUtils;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
 import 
org.apache.airavata.model.appcatalog.groupresourceprofile.AwsComputeResourcePreference;
 import org.apache.airavata.model.credential.store.PasswordCredential;
 import org.apache.airavata.model.credential.store.SSHCredential;
@@ -43,6 +44,7 @@ import java.util.UUID;
 /**
  * Create all required AWS resources (SecurityGroup, KeyPair) and launches an 
EC2 instance
  */
+@TaskDef(name = "Create EC2 Instance Task")
 public class CreateEC2InstanceTask extends AiravataTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CreateEC2InstanceTask.class);
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
new file mode 100644
index 0000000000..2c64bcd256
--- /dev/null
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
@@ -0,0 +1,21 @@
+package org.apache.airavata.helix.impl.task.aws;
+
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "No Operation Task")
+public class NoOperationTask extends AiravataTask  {
+
+    @Override
+    public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+        return new TaskResult(TaskResult.Status.COMPLETED, "OK");
+    }
+
+    @Override
+    public void onCancel(TaskContext taskContext) {
+
+    }
+}
diff --git 
a/modules/distribution/src/main/assembly/participant-bin-assembly.xml 
b/modules/distribution/src/main/assembly/participant-bin-assembly.xml
index e350af809d..35f624dc0f 100644
--- a/modules/distribution/src/main/assembly/participant-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/participant-bin-assembly.xml
@@ -220,6 +220,19 @@
                 <include>io.prometheus:simpleclient_httpserver:jar</include>
                 <include>io.prometheus:simpleclient_common:jar</include>
                 <include>org.apache.commons:commons-lang3</include>
+
+                <include>software.amazon.awssdk:ec2:jar</include>
+                <include>software.amazon.awssdk:auth:jar</include>
+                <include>software.amazon.awssdk:identity-spi:jar</include>
+                <include>software.amazon.awssdk:sdk-core:jar</include>
+                <include>software.amazon.awssdk:utils:jar</include>
+                <include>software.amazon.awssdk:regions:jar</include>
+                <include>software.amazon.awssdk:http-client-spi:jar</include>
+                
<include>software.amazon.awssdk:url-connection-client:jar</include>
+                <include>software.amazon.awssdk:aws-core:jar</include>
+                <include>software.amazon.awssdk:retries:jar</include>
+                <include>software.amazon.awssdk:retries-spi:jar</include>
+                <include>software.amazon.awssdk:profiles:jar</include>
             </includes>
             <excludes>
                 <exclude>mysql:mysql-connector-java:jar</exclude>
diff --git a/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml 
b/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
index 9e3343732e..854ef8c707 100644
--- a/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
@@ -175,6 +175,15 @@
                 <include>io.prometheus:simpleclient_httpserver:jar</include>
                 <include>io.prometheus:simpleclient_common:jar</include>
                 <include>org.apache.commons:commons-lang3</include>
+
+                <include>software.amazon.awssdk:ec2:jar</include>
+                <include>software.amazon.awssdk:auth:jar</include>
+                <include>software.amazon.awssdk:identity-spi:jar</include>
+                <include>software.amazon.awssdk:sdk-core:jar</include>
+                <include>software.amazon.awssdk:utils:jar</include>
+                <include>software.amazon.awssdk:regions:jar</include>
+                <include>software.amazon.awssdk:http-client-spi:jar</include>
+                
<include>software.amazon.awssdk:url-connection-client:jar</include>
             </includes>
             <excludes>
                 <exclude>mysql:mysql-connector-java:jar</exclude>
diff --git a/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml 
b/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
index a09dba9658..6de6e74999 100644
--- a/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
@@ -174,6 +174,14 @@
                 <include>io.prometheus:simpleclient_httpserver:jar</include>
                 <include>io.prometheus:simpleclient_common:jar</include>
                 <include>org.apache.commons:commons-lang3</include>
+                <include>software.amazon.awssdk:ec2:jar</include>
+                <include>software.amazon.awssdk:auth:jar</include>
+                <include>software.amazon.awssdk:identity-spi:jar</include>
+                <include>software.amazon.awssdk:sdk-core:jar</include>
+                <include>software.amazon.awssdk:utils:jar</include>
+                <include>software.amazon.awssdk:regions:jar</include>
+                <include>software.amazon.awssdk:http-client-spi:jar</include>
+                
<include>software.amazon.awssdk:url-connection-client:jar</include>
             </includes>
             <excludes>
                 <exclude>mysql:mysql-connector-java:jar</exclude>
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index f8ac69815a..1aae29d748 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -1,36 +1,36 @@
 /**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.airavata.orchestrator.cpi.impl;
 
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import 
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -44,9 +44,14 @@ import org.apache.airavata.model.process.ProcessModel;
 import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.*;
+import org.apache.airavata.model.task.DataStageType;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.EnvironmentSetupTaskModel;
+import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.airavata.model.task.MonitorTaskModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.model.util.ExperimentModelUtil;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -60,6 +65,18 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
 public class SimpleOrchestratorImpl extends AbstractOrchestrator {
     private static final Logger logger = 
LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
     private ExecutorService executor;
@@ -95,8 +112,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
 
     public ValidationResults validateExperiment(ExperimentModel experiment)
             throws OrchestratorException, LaunchValidationException {
-        org.apache.airavata.model.error.ValidationResults validationResults =
-                new org.apache.airavata.model.error.ValidationResults();
+        ValidationResults validationResults =
+                new ValidationResults();
         validationResults.setValidationState(
                 true); // initially making it to success, if atleast one 
failed them simply mark it failed.
         String errorMsg = "Validation Errors : ";
@@ -172,8 +189,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
     public ValidationResults validateProcess(ExperimentModel experiment, 
ProcessModel processModel)
             throws OrchestratorException, LaunchValidationException {
 
-        org.apache.airavata.model.error.ValidationResults validationResults =
-                new org.apache.airavata.model.error.ValidationResults();
+        ValidationResults validationResults =
+                new ValidationResults();
         validationResults.setValidationState(
                 true); // initially making it to success, if atleast one 
failed them simply mark it failed.
         String errorMsg = "Validation Errors : ";
@@ -277,7 +294,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         this.jobSubmitter = jobSubmitter;
     }
 
-    public void initialize() throws OrchestratorException {}
+    public void initialize() throws OrchestratorException {
+    }
 
     public List<ProcessModel> createProcesses(String experimentId, String 
gatewayId) throws OrchestratorException {
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
@@ -304,61 +322,31 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
     public String createAndSaveTasks(String gatewayId, ProcessModel 
processModel) throws OrchestratorException {
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
+            GroupComputeResourcePreference preference = 
OrchestratorUtils.getGroupComputeResourcePreference(processModel);
+            ResourceType resourceType = preference.getResourceType();
+            logger.info("Determined resource type as {} for process {}", 
resourceType, processModel.getProcessId());
+
             ComputationalResourceSchedulingModel resourceSchedule = 
processModel.getProcessResourceSchedule();
-            String userGivenQueueName = resourceSchedule.getQueueName();
             int userGivenWallTime = resourceSchedule.getWallTimeLimit();
             String resourceHostId = resourceSchedule.getResourceHostId();
             if (resourceHostId == null) {
                 throw new OrchestratorException("Compute Resource Id cannot be 
null at this point");
             }
-            ComputeResourceDescription computeResource = 
registryClient.getComputeResource(resourceHostId);
-            JobSubmissionInterface preferredJobSubmissionInterface =
-                    
OrchestratorUtils.getPreferredJobSubmissionInterface(processModel, gatewayId);
-            JobSubmissionProtocol preferredJobSubmissionProtocol =
-                    
OrchestratorUtils.getPreferredJobSubmissionProtocol(processModel, gatewayId);
+
+            // TODO - handle for different resource types
+            JobSubmissionInterface preferredJobSubmissionInterface = 
OrchestratorUtils.getPreferredJobSubmissionInterface(processModel, gatewayId);
+            JobSubmissionProtocol preferredJobSubmissionProtocol = 
OrchestratorUtils.getPreferredJobSubmissionProtocol(processModel, gatewayId);
             List<String> taskIdList = new ArrayList<>();
 
             if (preferredJobSubmissionProtocol == 
JobSubmissionProtocol.UNICORE) {
                 // TODO - breakdown unicore all in one task to multiple tasks, 
then we don't need to handle UNICORE
                 // here.
-                taskIdList.addAll(createAndSaveSubmissionTasks(
-                        registryClient, gatewayId, 
preferredJobSubmissionInterface, processModel, userGivenWallTime));
+                taskIdList.addAll(createAndSaveSubmissionTasks(registryClient, 
preferredJobSubmissionInterface, processModel, userGivenWallTime));
             } else {
-                taskIdList.addAll(createAndSaveEnvSetupTask(registryClient, 
gatewayId, processModel));
-                
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
-                //                if (autoSchedule) {
-                //                    List<BatchQueue> definedBatchQueues = 
computeResource.getBatchQueues();
-                //                    for (BatchQueue batchQueue : 
definedBatchQueues) {
-                //                        if 
(batchQueue.getQueueName().equals(userGivenQueueName)) {
-                //                            int maxRunTime = 
batchQueue.getMaxRunTime();
-                //                            if (maxRunTime < 
userGivenWallTime) {
-                //                                
resourceSchedule.setWallTimeLimit(maxRunTime);
-                //                                // need to create more job 
submissions
-                //                                int numOfMaxWallTimeJobs = 
((int) Math.floor(userGivenWallTime /
-                // maxRunTime));
-                //                                for (int i = 1; i <= 
numOfMaxWallTimeJobs; i++) {
-                //                                    taskIdList.addAll(
-                //                                            
createAndSaveSubmissionTasks(registryClient, gatewayId,
-                // preferredJobSubmissionInterface, processModel, maxRunTime));
-                //                                }
-                //                                int leftWallTime = 
userGivenWallTime % maxRunTime;
-                //                                if (leftWallTime != 0) {
-                //                                    taskIdList.addAll(
-                //                                            
createAndSaveSubmissionTasks(registryClient, gatewayId,
-                // preferredJobSubmissionInterface, processModel, 
leftWallTime));
-                //                                }
-                //                            } else {
-                //                                taskIdList.addAll(
-                //                                        
createAndSaveSubmissionTasks(registryClient, gatewayId,
-                // preferredJobSubmissionInterface, processModel, 
userGivenWallTime));
-                //                            }
-                //                        }
-                //                    }
-                //                } else {
-                taskIdList.addAll(createAndSaveSubmissionTasks(
-                        registryClient, gatewayId, 
preferredJobSubmissionInterface, processModel, userGivenWallTime));
-                //                }
-                
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+                taskIdList.addAll(createAndSaveEnvSetupTask(registryClient, 
gatewayId, processModel, resourceType));
+                
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId, 
resourceType));
+                taskIdList.addAll(createAndSaveSubmissionTasks(registryClient, 
preferredJobSubmissionInterface, processModel, userGivenWallTime));
+                
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId, 
resourceType));
             }
             // update process scheduling
             registryClient.updateProcess(processModel, 
processModel.getProcessId());
@@ -372,13 +360,12 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         }
     }
 
-    public String createAndSaveIntermediateOutputFetchingTasks(
-            String gatewayId, ProcessModel processModel, ProcessModel 
parentProcess) throws OrchestratorException {
+    public String createAndSaveIntermediateOutputFetchingTasks(String 
gatewayId, ProcessModel processModel, ProcessModel parentProcess) throws 
OrchestratorException {
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            List<String> taskIdList = new ArrayList<>();
-
-            
taskIdList.addAll(createAndSaveIntermediateOutputDataStagingTasks(processModel, 
gatewayId, parentProcess));
+            GroupComputeResourcePreference preference = 
OrchestratorUtils.getGroupComputeResourcePreference(processModel);
+            ResourceType resourceType = preference.getResourceType();
+            List<String> taskIdList = new 
ArrayList<>(createAndSaveIntermediateOutputDataStagingTasks(processModel, 
gatewayId, parentProcess, resourceType));
             // update process scheduling
             registryClient.updateProcess(processModel, 
processModel.getProcessId());
             return getTaskDag(taskIdList);
@@ -403,33 +390,40 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         return dag.substring(0, dag.length() - 1); // remove last comma
     }
 
-    private List<String> createAndSaveEnvSetupTask(
-            RegistryService.Client registryClient, String gatewayId, 
ProcessModel processModel)
+    private List<String> createAndSaveEnvSetupTask(RegistryService.Client 
registryClient, String gatewayId, ProcessModel processModel, ResourceType 
resourceType)
             throws TException, AiravataException, OrchestratorException {
         List<String> envTaskIds = new ArrayList<>();
+
         TaskModel envSetupTask = new TaskModel();
         envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
-        envSetupTask.setTaskStatuses(Arrays.asList(new 
TaskStatus(TaskState.CREATED)));
+        envSetupTask.setTaskStatuses(List.of(new 
TaskStatus(TaskState.CREATED)));
         
envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
         
envSetupTask.setLastUpdateTime(AiravataUtils.getCurrentTimestamp().getTime());
         envSetupTask.setParentProcessId(processModel.getProcessId());
+
         EnvironmentSetupTaskModel envSetupSubModel = new 
EnvironmentSetupTaskModel();
-        
envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(processModel,
 gatewayId));
-        String scratchLocation = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
-        String workingDir = scratchLocation + File.separator + 
processModel.getProcessId();
-        envSetupSubModel.setLocation(workingDir);
+        
envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(processModel,
 gatewayId)); // TODO support for CLOUD (AWS)
+
+        if (resourceType == ResourceType.SLURM) {
+            String scratchLocation = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
+            String workingDir = scratchLocation + File.separator + 
processModel.getProcessId();
+            envSetupSubModel.setLocation(workingDir);
+        } else if (resourceType == ResourceType.AWS) {
+            envSetupSubModel.setLocation(File.separator + "tmp" + 
File.separator + processModel.getProcessId());
+        }
+
         byte[] envSetupSub = 
ThriftUtils.serializeThriftObject(envSetupSubModel);
         envSetupTask.setSubTaskModel(envSetupSub);
         envSetupTask.setMaxRetry(3);
         envSetupTask.setCurrentRetry(0);
-        String envSetupTaskId = (String) registryClient.addTask(envSetupTask, 
processModel.getProcessId());
+        String envSetupTaskId = registryClient.addTask(envSetupTask, 
processModel.getProcessId());
         envSetupTask.setTaskId(envSetupTaskId);
         envTaskIds.add(envSetupTaskId);
+
         return envTaskIds;
     }
 
-    public List<String> createAndSaveInputDataStagingTasks(ProcessModel 
processModel, String gatewayId)
-            throws AiravataException, OrchestratorException {
+    public List<String> createAndSaveInputDataStagingTasks(ProcessModel 
processModel, String gatewayId, ResourceType resourceType) throws 
AiravataException {
 
         List<String> dataStagingTaskIds = new ArrayList<>();
         List<InputDataObjectType> processInputs = 
processModel.getProcessInputs();
@@ -445,18 +439,13 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
                         break;
                     case URI:
                     case URI_COLLECTION:
-                        if ((processInput.getValue() == null
-                                        || processInput.getValue().equals(""))
-                                && !processInput.isIsRequired()) {
-                            logger.debug(
-                                    "Skipping input data staging task for {} 
since value is empty and not required",
-                                    processInput.getName());
+                        if ((processInput.getValue() == null || 
processInput.getValue().isEmpty()) && !processInput.isIsRequired()) {
+                            logger.debug("Skipping input data staging task for 
{} since value is empty and not required", processInput.getName());
                             break;
                         }
                         final RegistryService.Client registryClient = 
getRegistryServiceClient();
                         try {
-                            TaskModel inputDataStagingTask =
-                                    getInputDataStagingTask(registryClient, 
processModel, processInput, gatewayId);
+                            TaskModel inputDataStagingTask = 
getInputDataStagingTask(registryClient, processModel, processInput, gatewayId, 
resourceType);
                             String taskId = 
registryClient.addTask(inputDataStagingTask, processModel.getProcessId());
                             inputDataStagingTask.setTaskId(taskId);
                             
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
@@ -477,8 +466,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         return dataStagingTaskIds;
     }
 
-    public List<String> createAndSaveOutputDataStagingTasks(ProcessModel 
processModel, String gatewayId)
-            throws AiravataException, TException, OrchestratorException {
+    public List<String> createAndSaveOutputDataStagingTasks(ProcessModel 
processModel, String gatewayId,
+                                                            ResourceType 
resourceType) throws AiravataException, TException, OrchestratorException {
 
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
         List<String> dataStagingTaskIds = new ArrayList<>();
@@ -490,25 +479,20 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
                     DataType type = processOutput.getType();
                     switch (type) {
                         case STDOUT:
-                            if (null == processOutput.getValue()
-                                    || 
processOutput.getValue().trim().isEmpty()) {
+                            if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                                 processOutput.setValue(appName + ".stdout");
                             }
-                            createOutputDataSatagingTasks(
-                                    registryClient, processModel, gatewayId, 
dataStagingTaskIds, processOutput);
+                            createOutputDataSatagingTasks(registryClient, 
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         case STDERR:
-                            if (null == processOutput.getValue()
-                                    || 
processOutput.getValue().trim().isEmpty()) {
+                            if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                                 processOutput.setValue(appName + ".stderr");
                             }
-                            createOutputDataSatagingTasks(
-                                    registryClient, processModel, gatewayId, 
dataStagingTaskIds, processOutput);
+                            createOutputDataSatagingTasks(registryClient, 
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         case URI:
                         case URI_COLLECTION:
-                            createOutputDataSatagingTasks(
-                                    registryClient, processModel, gatewayId, 
dataStagingTaskIds, processOutput);
+                            createOutputDataSatagingTasks(registryClient, 
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         default:
                             // nothing to do
@@ -518,8 +502,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
             }
 
             try {
-                if (isArchive(registryClient, processModel, 
orchestratorContext)) {
-                    createArchiveDataStatgingTask(registryClient, 
processModel, gatewayId, dataStagingTaskIds);
+                if (isArchive(registryClient, processModel)) {
+                    createArchiveDataStatgingTask(registryClient, 
processModel, gatewayId, dataStagingTaskIds, resourceType);
                 }
             } catch (Exception e) {
                 throw new AiravataException("Error! Application interface 
retrieval failed", e);
@@ -532,9 +516,8 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         return dataStagingTaskIds;
     }
 
-    public List<String> createAndSaveIntermediateOutputDataStagingTasks(
-            ProcessModel processModel, String gatewayId, ProcessModel 
parentProcess)
-            throws AiravataException, TException, OrchestratorException {
+    public List<String> 
createAndSaveIntermediateOutputDataStagingTasks(ProcessModel processModel, 
String gatewayId, ProcessModel parentProcess,
+                                                                        
ResourceType resourceType) throws AiravataException, TException, 
OrchestratorException {
 
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
         List<String> dataStagingTaskIds = new ArrayList<>();
@@ -546,40 +529,20 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
                     DataType type = processOutput.getType();
                     switch (type) {
                         case STDOUT:
-                            if (null == processOutput.getValue()
-                                    || 
processOutput.getValue().trim().isEmpty()) {
+                            if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                                 processOutput.setValue(appName + ".stdout");
                             }
-                            createIntermediateOutputDataStagingTasks(
-                                    registryClient,
-                                    processModel,
-                                    gatewayId,
-                                    parentProcess,
-                                    dataStagingTaskIds,
-                                    processOutput);
+                            
createIntermediateOutputDataStagingTasks(registryClient, processModel, 
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         case STDERR:
-                            if (null == processOutput.getValue()
-                                    || 
processOutput.getValue().trim().isEmpty()) {
+                            if (null == processOutput.getValue() || 
processOutput.getValue().trim().isEmpty()) {
                                 processOutput.setValue(appName + ".stderr");
                             }
-                            createIntermediateOutputDataStagingTasks(
-                                    registryClient,
-                                    processModel,
-                                    gatewayId,
-                                    parentProcess,
-                                    dataStagingTaskIds,
-                                    processOutput);
+                            
createIntermediateOutputDataStagingTasks(registryClient, processModel, 
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         case URI:
                         case URI_COLLECTION:
-                            createIntermediateOutputDataStagingTasks(
-                                    registryClient,
-                                    processModel,
-                                    gatewayId,
-                                    parentProcess,
-                                    dataStagingTaskIds,
-                                    processOutput);
+                            
createIntermediateOutputDataStagingTasks(registryClient, processModel, 
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
                             break;
                         default:
                             // nothing to do
@@ -596,23 +559,16 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         return dataStagingTaskIds;
     }
 
-    private boolean isArchive(
-            RegistryService.Client registryClient, ProcessModel processModel, 
OrchestratorContext orchestratorContext)
-            throws TException {
-        ApplicationInterfaceDescription appInterface =
-                
registryClient.getApplicationInterface(processModel.getApplicationInterfaceId());
+    private boolean isArchive(RegistryService.Client registryClient, 
ProcessModel processModel) throws TException {
+        ApplicationInterfaceDescription appInterface = 
registryClient.getApplicationInterface(processModel.getApplicationInterfaceId());
         return appInterface.isArchiveWorkingDirectory();
     }
 
-    private void createArchiveDataStatgingTask(
-            RegistryService.Client registryClient,
-            ProcessModel processModel,
-            String gatewayId,
-            List<String> dataStagingTaskIds)
-            throws AiravataException, TException, OrchestratorException {
-        TaskModel archiveTask = null;
+    private void createArchiveDataStatgingTask(RegistryService.Client 
registryClient, ProcessModel processModel, String gatewayId,
+                                               List<String> 
dataStagingTaskIds, ResourceType resourceType) throws AiravataException, 
TException, OrchestratorException {
+        TaskModel archiveTask;
         try {
-            archiveTask = getOutputDataStagingTask(registryClient, 
processModel, null, gatewayId, null);
+            archiveTask = getOutputDataStagingTask(registryClient, 
processModel, null, gatewayId, null, resourceType);
         } catch (TException e) {
             throw new AiravataException("Error! DataStaging sub task 
serialization failed", e);
         }
@@ -621,16 +577,10 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         dataStagingTaskIds.add(archiveTask.getTaskId());
     }
 
-    private void createOutputDataSatagingTasks(
-            RegistryService.Client registryClient,
-            ProcessModel processModel,
-            String gatewayId,
-            List<String> dataStagingTaskIds,
-            OutputDataObjectType processOutput)
-            throws AiravataException, OrchestratorException {
+    private void createOutputDataSatagingTasks(RegistryService.Client 
registryClient, ProcessModel processModel, String gatewayId,
+                                               List<String> 
dataStagingTaskIds, OutputDataObjectType processOutput, ResourceType 
resourceType) throws AiravataException, OrchestratorException {
         try {
-            TaskModel outputDataStagingTask =
-                    getOutputDataStagingTask(registryClient, processModel, 
processOutput, gatewayId, null);
+            TaskModel outputDataStagingTask = 
getOutputDataStagingTask(registryClient, processModel, processOutput, 
gatewayId, null, resourceType);
             String taskId = registryClient.addTask(outputDataStagingTask, 
processModel.getProcessId());
             outputDataStagingTask.setTaskId(taskId);
             dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
@@ -639,17 +589,11 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         }
     }
 
-    private void createIntermediateOutputDataStagingTasks(
-            RegistryService.Client registryClient,
-            ProcessModel processModel,
-            String gatewayId,
-            ProcessModel parentProcess,
-            List<String> dataStagingTaskIds,
-            OutputDataObjectType processOutput)
-            throws AiravataException, OrchestratorException {
+    private void 
createIntermediateOutputDataStagingTasks(RegistryService.Client registryClient, 
ProcessModel processModel, String gatewayId,
+                                                          ProcessModel 
parentProcess, List<String> dataStagingTaskIds,
+                                                          OutputDataObjectType 
processOutput, ResourceType resourceType) throws AiravataException, 
OrchestratorException {
         try {
-            TaskModel outputDataStagingTask =
-                    getOutputDataStagingTask(registryClient, processModel, 
processOutput, gatewayId, parentProcess);
+            TaskModel outputDataStagingTask = 
getOutputDataStagingTask(registryClient, processModel, processOutput, 
gatewayId, parentProcess, resourceType);
             outputDataStagingTask.setTaskType(TaskTypes.OUTPUT_FETCHING);
             String taskId = registryClient.addTask(outputDataStagingTask, 
processModel.getProcessId());
             outputDataStagingTask.setTaskId(taskId);
@@ -659,20 +603,14 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         }
     }
 
-    private List<String> createAndSaveSubmissionTasks(
-            RegistryService.Client registryClient,
-            String gatewayId,
-            JobSubmissionInterface jobSubmissionInterface,
-            ProcessModel processModel,
-            int wallTime)
+    private List<String> createAndSaveSubmissionTasks(RegistryService.Client 
registryClient, JobSubmissionInterface jobSubmissionInterface, ProcessModel 
processModel, int wallTime)
             throws TException, OrchestratorException {
 
         JobSubmissionProtocol jobSubmissionProtocol = 
jobSubmissionInterface.getJobSubmissionProtocol();
-        MonitorMode monitorMode = null;
-        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH
-                || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
-            SSHJobSubmission sshJobSubmission =
-                    
OrchestratorUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+        MonitorMode monitorMode;
+
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || 
jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+            SSHJobSubmission sshJobSubmission = 
OrchestratorUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
             monitorMode = sshJobSubmission.getMonitorMode();
         } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
             monitorMode = MonitorMode.FORK;
@@ -688,42 +626,50 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
                     jobSubmissionProtocol.name());
             throw new OrchestratorException("Unsupported Job Submission 
Protocol " + jobSubmissionProtocol.name());
         }
+
         List<String> submissionTaskIds = new ArrayList<>();
-        TaskModel taskModel = new TaskModel();
-        taskModel.setParentProcessId(processModel.getProcessId());
-        taskModel.setCreationTime(System.currentTimeMillis());
-        taskModel.setLastUpdateTime(taskModel.getCreationTime());
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        taskModel.setTaskStatuses(Arrays.asList(taskStatus));
-        taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+
         JobSubmissionTaskModel submissionSubTask = new 
JobSubmissionTaskModel();
         submissionSubTask.setMonitorMode(monitorMode);
         submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
         submissionSubTask.setWallTime(wallTime);
+
         byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
+
+        TaskModel taskModel = new TaskModel();
+        taskModel.setParentProcessId(processModel.getProcessId());
+        taskModel.setCreationTime(System.currentTimeMillis());
+        taskModel.setLastUpdateTime(taskModel.getCreationTime());
+        taskModel.setTaskStatuses(List.of(taskStatus));
+        taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
         taskModel.setSubTaskModel(bytes);
         taskModel.setMaxRetry(1);
         taskModel.setCurrentRetry(0);
+
         String taskId = registryClient.addTask(taskModel, 
processModel.getProcessId());
         taskModel.setTaskId(taskId);
         submissionTaskIds.add(taskModel.getTaskId());
 
         // create monitor task for this Email based monitor mode job
         if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR || 
monitorMode == MonitorMode.CLOUD_JOB_MONITOR) {
+
+            TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+            
monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
             TaskModel monitorTaskModel = new TaskModel();
             monitorTaskModel.setParentProcessId(processModel.getProcessId());
             monitorTaskModel.setCreationTime(System.currentTimeMillis());
             
monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
-            TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
-            monitorTaskStatus.setTimeOfStateChange(
-                    AiravataUtils.getCurrentTimestamp().getTime());
-            monitorTaskModel.setTaskStatuses(Arrays.asList(monitorTaskStatus));
+            monitorTaskModel.setTaskStatuses(List.of(monitorTaskStatus));
             monitorTaskModel.setTaskType(TaskTypes.MONITORING);
+
             MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
             monitorSubTaskModel.setMonitorMode(monitorMode);
             
monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
-            String mTaskId = (String) registryClient.addTask(monitorTaskModel, 
processModel.getProcessId());
+
+            String mTaskId = registryClient.addTask(monitorTaskModel, 
processModel.getProcessId());
             monitorTaskModel.setTaskId(mTaskId);
             submissionTaskIds.add(monitorTaskModel.getTaskId());
         }
@@ -740,11 +686,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         });
     }
 
-    private TaskModel getInputDataStagingTask(
-            RegistryService.Client registryClient,
-            ProcessModel processModel,
-            InputDataObjectType processInput,
-            String gatewayId)
+    private TaskModel getInputDataStagingTask(RegistryService.Client 
registryClient, ProcessModel processModel, InputDataObjectType processInput, 
String gatewayId, ResourceType resourceType)
             throws TException, AiravataException, OrchestratorException {
         // create new task model for this task
         TaskModel taskModel = new TaskModel();
@@ -757,103 +699,130 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
         taskModel.setTaskType(TaskTypes.DATA_STAGING);
         // create data staging sub task model
         DataStagingTaskModel submodel = new DataStagingTaskModel();
-        ComputeResourceDescription computeResource =
-                
registryClient.getComputeResource(processModel.getComputeResourceId());
-        String scratchLocation = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
-        String workingDir =
-                (scratchLocation.endsWith(File.separator) ? scratchLocation : 
scratchLocation + File.separator)
-                        + processModel.getProcessId()
-                        + File.separator;
-        URI destination = null;
-        try {
-            DataMovementProtocol dataMovementProtocol =
-                    
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
-            String loginUserName = 
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
-            StringBuilder destinationPath = new StringBuilder(workingDir);
-            Optional.ofNullable(processInput.getOverrideFilename())
-                    .ifPresent(destinationPath::append); // If an override 
filename is provided
-
-            destination = new URI(
-                    dataMovementProtocol.name(),
-                    loginUserName,
-                    computeResource.getHostName(),
-                    OrchestratorUtils.getDataMovementPort(processModel, 
gatewayId),
-                    destinationPath.toString(),
-                    null,
-                    null);
-        } catch (URISyntaxException e) {
-            throw new OrchestratorException("Error while constructing 
destination file URI", e);
+        ComputeResourceDescription computeResource = 
registryClient.getComputeResource(processModel.getComputeResourceId());
+
+        String destinationUriString = "";
+
+        if (resourceType == ResourceType.SLURM) {
+            String scratchLocation = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
+            String workingDir = (scratchLocation.endsWith(File.separator) ? 
scratchLocation : scratchLocation + File.separator) + 
processModel.getProcessId() + File.separator;
+
+            URI destination;
+            try {
+                DataMovementProtocol dataMovementProtocol = 
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
+                String loginUserName = 
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+                StringBuilder destinationPath = new StringBuilder(workingDir);
+                
Optional.ofNullable(processInput.getOverrideFilename()).ifPresent(destinationPath::append);
 // If an override filename is provided
+
+                destination = new URI(
+                        dataMovementProtocol.name(),
+                        loginUserName,
+                        computeResource.getHostName(),
+                        OrchestratorUtils.getDataMovementPort(processModel, 
gatewayId),
+                        destinationPath.toString(),
+                        null,
+                        null);
+                destinationUriString = destination.toString();
+
+            } catch (URISyntaxException e) {
+                throw new OrchestratorException("Error while constructing 
destination file URI", e);
+            }
+
+        } else if (resourceType == ResourceType.AWS) {
+            logger.info("Configuring AWS Input Data Staging task for process 
{}. Setting dummy S3 destination.", processModel.getProcessId());
+
+            try {
+                String fileName = processInput.getOverrideFilename() != null 
&& !processInput.getOverrideFilename().isEmpty()
+                        ? processInput.getOverrideFilename()
+                        : new File(new 
URI(processInput.getValue()).getPath()).getName();
+                destinationUriString = "s3://dummy-bucket/" + 
processModel.getProcessId() + "/" + fileName;
+
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(e);
+            }
+
         }
+
+        submodel.setDestination(destinationUriString);
         submodel.setType(DataStageType.INPUT);
         submodel.setSource(processInput.getValue());
         submodel.setProcessInput(processInput);
-        submodel.setDestination(destination.toString());
         taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
         taskModel.setMaxRetry(3);
         taskModel.setCurrentRetry(0);
         return taskModel;
     }
 
-    private TaskModel getOutputDataStagingTask(
-            RegistryService.Client registryClient,
-            ProcessModel processModel,
-            OutputDataObjectType processOutput,
-            String gatewayId,
-            ProcessModel parentProcess)
-            throws TException, AiravataException, OrchestratorException {
+    private TaskModel getOutputDataStagingTask(RegistryService.Client 
registryClient, ProcessModel processModel, OutputDataObjectType processOutput,
+                                               String gatewayId, ProcessModel 
parentProcess, ResourceType resourceType) throws TException, AiravataException, 
OrchestratorException {
         try {
-
             // create new task model for this task
+            TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
             TaskModel taskModel = new TaskModel();
             taskModel.setParentProcessId(processModel.getProcessId());
             
taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
             taskModel.setLastUpdateTime(taskModel.getCreationTime());
-            TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
-            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            taskModel.setTaskStatuses(Arrays.asList(taskStatus));
+            taskModel.setTaskStatuses(List.of(taskStatus));
             taskModel.setTaskType(TaskTypes.DATA_STAGING);
-            ComputeResourceDescription computeResource =
-                    
registryClient.getComputeResource(processModel.getComputeResourceId());
 
-            String workingDir = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId)
-                    + File.separator
-                    + (parentProcess == null ? processModel.getProcessId() : 
parentProcess.getProcessId())
-                    + File.separator;
+            ComputeResourceDescription computeResource = 
registryClient.getComputeResource(processModel.getComputeResourceId());
             DataStagingTaskModel submodel = new DataStagingTaskModel();
-            DataMovementProtocol dataMovementProtocol =
-                    
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
-            URI source = null;
-            try {
-                String loginUserName = 
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+            String sourceUriString = "";
+
+            if (resourceType == ResourceType.SLURM) {
+                String workingDir = 
OrchestratorUtils.getScratchLocation(processModel, gatewayId)
+                        + File.separator
+                        + (parentProcess == null ? processModel.getProcessId() 
: parentProcess.getProcessId())
+                        + File.separator;
+                DataMovementProtocol dataMovementProtocol = 
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
+                URI source;
+                try {
+                    String loginUserName = 
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+                    if (processOutput != null) {
+                        submodel.setType(DataStageType.OUPUT);
+                        submodel.setProcessOutput(processOutput);
+                        source = new URI(
+                                dataMovementProtocol.name(),
+                                loginUserName,
+                                computeResource.getHostName(),
+                                
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
+                                workingDir + processOutput.getValue(),
+                                null,
+                                null);
+                    } else {
+                        // archive
+                        submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+                        source = new URI(
+                                dataMovementProtocol.name(),
+                                loginUserName,
+                                computeResource.getHostName(),
+                                
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
+                                workingDir,
+                                null,
+                                null);
+                    }
+
+                } catch (URISyntaxException e) {
+                    throw new OrchestratorException("Error while constructing 
source file URI", e);
+                }
+                sourceUriString = source.toString();
+
+            } else if (resourceType == ResourceType.AWS) {
+                logger.info("Configuring AWS Output Data Staging task for 
process {}. Setting dummy S3 source.", processModel.getProcessId());
                 if (processOutput != null) {
                     submodel.setType(DataStageType.OUPUT);
                     submodel.setProcessOutput(processOutput);
-                    source = new URI(
-                            dataMovementProtocol.name(),
-                            loginUserName,
-                            computeResource.getHostName(),
-                            
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
-                            workingDir + processOutput.getValue(),
-                            null,
-                            null);
+                    sourceUriString = "s3://dummy-bucket/" + 
processModel.getProcessId() + "/" + processOutput.getValue();
+
                 } else {
-                    // archive
                     submodel.setType(DataStageType.ARCHIVE_OUTPUT);
-                    source = new URI(
-                            dataMovementProtocol.name(),
-                            loginUserName,
-                            computeResource.getHostName(),
-                            
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
-                            workingDir,
-                            null,
-                            null);
+                    sourceUriString = "s3://dummy-bucket/" + 
processModel.getProcessId() + "/";
                 }
-            } catch (URISyntaxException e) {
-                throw new OrchestratorException("Error while constructing 
source file URI", e);
             }
-            // We don't know destination location at this time, data staging 
task will set this.
-            // because destination is required field we set dummy destination
-            submodel.setSource(source.toString());
+
+            submodel.setSource(sourceUriString);
             // We don't know destination location at this time, data staging 
task will set this.
             // because destination is required field we set dummy destination
             submodel.setDestination("dummy://temp/file/location");
@@ -861,6 +830,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator {
             taskModel.setMaxRetry(3);
             taskModel.setCurrentRetry(0);
             return taskModel;
+
         } catch (OrchestratorException e) {
             throw new OrchestratorException("Error occurred while retrieving 
data movement from app catalog", e);
         }
diff --git 
a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
 
b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
index b7675f7518..113f7a4a66 100644
--- 
a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
+++ 
b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
@@ -19,6 +19,7 @@
 package org.apache.airavata.registry.core.entities.appcatalog;
 
 import jakarta.persistence.Column;
+import jakarta.persistence.DiscriminatorValue;
 import jakarta.persistence.Entity;
 import jakarta.persistence.PrimaryKeyJoinColumn;
 import jakarta.persistence.PrimaryKeyJoinColumns;
@@ -28,6 +29,7 @@ import jakarta.persistence.Table;
  * The persistent class for the aws_group_compute_resource_preference database 
table.
  */
 @Entity
+@DiscriminatorValue("AWS")
 @Table(name = "AWS_GROUP_COMPUTE_RESOURCE_PREFERENCE")
 @PrimaryKeyJoinColumns({
         @PrimaryKeyJoinColumn(name = "RESOURCE_ID", referencedColumnName = 
"RESOURCE_ID"),
diff --git 
a/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift 
b/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
index 357d3fbe61..f82983631c 100644
--- 
a/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
+++ 
b/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
@@ -60,12 +60,9 @@ struct SlurmComputeResourcePreference {
 }
 
 struct AwsComputeResourcePreference {
-    1: optional string preferredAmiId,
-    2: optional string preferredInstanceType,
-    3: optional string region,
-    4: optional string securityGroupId,
-    5: optional string keyPairName,
-    6: optional i64    maxStartupTime,
+    1: optional string region,
+    2: optional string preferredAmiId,
+    3: optional string preferredInstanceType,
 }
 
 union EnvironmentSpecificPreferences {

Reply via email to