This is an automated email from the ASF dual-hosted git repository.
lahirujayathilake pushed a commit to branch airavata-aws
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/airavata-aws by this push:
new 76004dc829 updated the simple orchestrator to support aws tasks
creation, included the libs in distributions, and updated the participant with
tasks
76004dc829 is described below
commit 76004dc8296e38a8bb70e6e9af3cc4b8c9bf9295
Author: lahiruj <[email protected]>
AuthorDate: Fri Jun 27 03:43:00 2025 -0400
updated the simple orchestrator to support aws tasks creation, included the
libs in distributions, and updated the participant with tasks
---
modules/airavata-helix/helix-spectator/pom.xml | 10 +
.../helix/impl/participant/GlobalParticipant.java | 4 +-
.../airavata/helix/impl/task/AWSTaskFactory.java | 15 +-
.../helix/impl/task/aws/CreateEC2InstanceTask.java | 2 +
.../helix/impl/task/aws/NoOperationTask.java | 21 +
.../src/main/assembly/participant-bin-assembly.xml | 13 +
.../src/main/assembly/post-wm-bin-assembly.xml | 9 +
.../src/main/assembly/pre-wm-bin-assembly.xml | 8 +
.../cpi/impl/SimpleOrchestratorImpl.java | 508 ++++++++++-----------
.../AWSGroupComputeResourcePrefEntity.java | 2 +
.../group_resource_profile_model.thrift | 9 +-
11 files changed, 318 insertions(+), 283 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/pom.xml
b/modules/airavata-helix/helix-spectator/pom.xml
index d6e4b060aa..e00e61cfb1 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -90,6 +90,16 @@ under the License.
<artifactId>ec2</artifactId>
<version>2.31.70</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>2.31.70</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>retries</artifactId>
+ <version>2.31.70</version>
+ </dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index eb1cdedc87..7e4df17034 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -48,7 +48,9 @@ public class GlobalParticipant extends
HelixParticipant<AbstractTask> {
"org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask",
"org.apache.airavata.helix.impl.task.parsing.DataParsingTask",
"org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask",
- "org.apache.airavata.helix.impl.task.mock.MockTask"
+ "org.apache.airavata.helix.impl.task.mock.MockTask",
+ "org.apache.airavata.helix.impl.task.aws.CreateEC2InstanceTask",
+ "org.apache.airavata.helix.impl.task.aws.NoOperationTask"
};
@SuppressWarnings("WeakerAccess")
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
index a121d50e5a..75a466b03b 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
@@ -19,6 +19,7 @@
package org.apache.airavata.helix.impl.task;
import org.apache.airavata.helix.impl.task.aws.CreateEC2InstanceTask;
+import org.apache.airavata.helix.impl.task.aws.NoOperationTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,36 +35,36 @@ public class AWSTaskFactory implements HelixTaskFactory {
@Override
public AiravataTask createInputDataStagingTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createJobSubmissionTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createOutputDataStagingTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createArchiveTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createJobVerificationTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createCompletingTask(String processId) {
- return null;
+ return new NoOperationTask();
}
@Override
public AiravataTask createParsingTriggeringTask(String processId) {
- return null;
+ return new NoOperationTask();
}
}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
index 303e7051e5..fd9fe63f16 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
@@ -22,6 +22,7 @@ import org.apache.airavata.agents.api.AgentUtils;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
import
org.apache.airavata.model.appcatalog.groupresourceprofile.AwsComputeResourcePreference;
import org.apache.airavata.model.credential.store.PasswordCredential;
import org.apache.airavata.model.credential.store.SSHCredential;
@@ -43,6 +44,7 @@ import java.util.UUID;
/**
* Create all required AWS resources (SecurityGroup, KeyPair) and launches an
EC2 instance
*/
+@TaskDef(name = "Create EC2 Instance Task")
public class CreateEC2InstanceTask extends AiravataTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(CreateEC2InstanceTask.class);
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
new file mode 100644
index 0000000000..2c64bcd256
--- /dev/null
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
@@ -0,0 +1,21 @@
+package org.apache.airavata.helix.impl.task.aws;
+
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.helix.task.TaskResult;
+
+@TaskDef(name = "No Operation Task")
+public class NoOperationTask extends AiravataTask {
+
+ @Override
+ public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
+ return new TaskResult(TaskResult.Status.COMPLETED, "OK");
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+}
diff --git
a/modules/distribution/src/main/assembly/participant-bin-assembly.xml
b/modules/distribution/src/main/assembly/participant-bin-assembly.xml
index e350af809d..35f624dc0f 100644
--- a/modules/distribution/src/main/assembly/participant-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/participant-bin-assembly.xml
@@ -220,6 +220,19 @@
<include>io.prometheus:simpleclient_httpserver:jar</include>
<include>io.prometheus:simpleclient_common:jar</include>
<include>org.apache.commons:commons-lang3</include>
+
+ <include>software.amazon.awssdk:ec2:jar</include>
+ <include>software.amazon.awssdk:auth:jar</include>
+ <include>software.amazon.awssdk:identity-spi:jar</include>
+ <include>software.amazon.awssdk:sdk-core:jar</include>
+ <include>software.amazon.awssdk:utils:jar</include>
+ <include>software.amazon.awssdk:regions:jar</include>
+ <include>software.amazon.awssdk:http-client-spi:jar</include>
+
<include>software.amazon.awssdk:url-connection-client:jar</include>
+ <include>software.amazon.awssdk:aws-core:jar</include>
+ <include>software.amazon.awssdk:retries:jar</include>
+ <include>software.amazon.awssdk:retries-spi:jar</include>
+ <include>software.amazon.awssdk:profiles:jar</include>
</includes>
<excludes>
<exclude>mysql:mysql-connector-java:jar</exclude>
diff --git a/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
b/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
index 9e3343732e..854ef8c707 100644
--- a/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/post-wm-bin-assembly.xml
@@ -175,6 +175,15 @@
<include>io.prometheus:simpleclient_httpserver:jar</include>
<include>io.prometheus:simpleclient_common:jar</include>
<include>org.apache.commons:commons-lang3</include>
+
+ <include>software.amazon.awssdk:ec2:jar</include>
+ <include>software.amazon.awssdk:auth:jar</include>
+ <include>software.amazon.awssdk:identity-spi:jar</include>
+ <include>software.amazon.awssdk:sdk-core:jar</include>
+ <include>software.amazon.awssdk:utils:jar</include>
+ <include>software.amazon.awssdk:regions:jar</include>
+ <include>software.amazon.awssdk:http-client-spi:jar</include>
+
<include>software.amazon.awssdk:url-connection-client:jar</include>
</includes>
<excludes>
<exclude>mysql:mysql-connector-java:jar</exclude>
diff --git a/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
b/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
index a09dba9658..6de6e74999 100644
--- a/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
+++ b/modules/distribution/src/main/assembly/pre-wm-bin-assembly.xml
@@ -174,6 +174,14 @@
<include>io.prometheus:simpleclient_httpserver:jar</include>
<include>io.prometheus:simpleclient_common:jar</include>
<include>org.apache.commons:commons-lang3</include>
+ <include>software.amazon.awssdk:ec2:jar</include>
+ <include>software.amazon.awssdk:auth:jar</include>
+ <include>software.amazon.awssdk:identity-spi:jar</include>
+ <include>software.amazon.awssdk:sdk-core:jar</include>
+ <include>software.amazon.awssdk:utils:jar</include>
+ <include>software.amazon.awssdk:regions:jar</include>
+ <include>software.amazon.awssdk:http-client-spi:jar</include>
+
<include>software.amazon.awssdk:url-connection-client:jar</include>
</includes>
<excludes>
<exclude>mysql:mysql-connector-java:jar</exclude>
diff --git
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index f8ac69815a..1aae29d748 100644
---
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -1,36 +1,36 @@
/**
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.airavata.orchestrator.cpi.impl;
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.ResourceType;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -44,9 +44,14 @@ import org.apache.airavata.model.process.ProcessModel;
import
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.*;
+import org.apache.airavata.model.task.DataStageType;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.EnvironmentSetupTaskModel;
+import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.airavata.model.task.MonitorTaskModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.model.util.ExperimentModelUtil;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -60,6 +65,18 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
public class SimpleOrchestratorImpl extends AbstractOrchestrator {
private static final Logger logger =
LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
private ExecutorService executor;
@@ -95,8 +112,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
public ValidationResults validateExperiment(ExperimentModel experiment)
throws OrchestratorException, LaunchValidationException {
- org.apache.airavata.model.error.ValidationResults validationResults =
- new org.apache.airavata.model.error.ValidationResults();
+ ValidationResults validationResults =
+ new ValidationResults();
validationResults.setValidationState(
true); // initially making it to success, if atleast one
failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
@@ -172,8 +189,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
public ValidationResults validateProcess(ExperimentModel experiment,
ProcessModel processModel)
throws OrchestratorException, LaunchValidationException {
- org.apache.airavata.model.error.ValidationResults validationResults =
- new org.apache.airavata.model.error.ValidationResults();
+ ValidationResults validationResults =
+ new ValidationResults();
validationResults.setValidationState(
true); // initially making it to success, if atleast one
failed them simply mark it failed.
String errorMsg = "Validation Errors : ";
@@ -277,7 +294,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
this.jobSubmitter = jobSubmitter;
}
- public void initialize() throws OrchestratorException {}
+ public void initialize() throws OrchestratorException {
+ }
public List<ProcessModel> createProcesses(String experimentId, String
gatewayId) throws OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
@@ -304,61 +322,31 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
public String createAndSaveTasks(String gatewayId, ProcessModel
processModel) throws OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
try {
+ GroupComputeResourcePreference preference =
OrchestratorUtils.getGroupComputeResourcePreference(processModel);
+ ResourceType resourceType = preference.getResourceType();
+ logger.info("Determined resource type as {} for process {}",
resourceType, processModel.getProcessId());
+
ComputationalResourceSchedulingModel resourceSchedule =
processModel.getProcessResourceSchedule();
- String userGivenQueueName = resourceSchedule.getQueueName();
int userGivenWallTime = resourceSchedule.getWallTimeLimit();
String resourceHostId = resourceSchedule.getResourceHostId();
if (resourceHostId == null) {
throw new OrchestratorException("Compute Resource Id cannot be
null at this point");
}
- ComputeResourceDescription computeResource =
registryClient.getComputeResource(resourceHostId);
- JobSubmissionInterface preferredJobSubmissionInterface =
-
OrchestratorUtils.getPreferredJobSubmissionInterface(processModel, gatewayId);
- JobSubmissionProtocol preferredJobSubmissionProtocol =
-
OrchestratorUtils.getPreferredJobSubmissionProtocol(processModel, gatewayId);
+
+ // TODO - handle for different resource types
+ JobSubmissionInterface preferredJobSubmissionInterface =
OrchestratorUtils.getPreferredJobSubmissionInterface(processModel, gatewayId);
+ JobSubmissionProtocol preferredJobSubmissionProtocol =
OrchestratorUtils.getPreferredJobSubmissionProtocol(processModel, gatewayId);
List<String> taskIdList = new ArrayList<>();
if (preferredJobSubmissionProtocol ==
JobSubmissionProtocol.UNICORE) {
// TODO - breakdown unicore all in one task to multiple tasks,
then we don't need to handle UNICORE
// here.
- taskIdList.addAll(createAndSaveSubmissionTasks(
- registryClient, gatewayId,
preferredJobSubmissionInterface, processModel, userGivenWallTime));
+ taskIdList.addAll(createAndSaveSubmissionTasks(registryClient,
preferredJobSubmissionInterface, processModel, userGivenWallTime));
} else {
- taskIdList.addAll(createAndSaveEnvSetupTask(registryClient,
gatewayId, processModel));
-
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId));
- // if (autoSchedule) {
- // List<BatchQueue> definedBatchQueues =
computeResource.getBatchQueues();
- // for (BatchQueue batchQueue :
definedBatchQueues) {
- // if
(batchQueue.getQueueName().equals(userGivenQueueName)) {
- // int maxRunTime =
batchQueue.getMaxRunTime();
- // if (maxRunTime <
userGivenWallTime) {
- //
resourceSchedule.setWallTimeLimit(maxRunTime);
- // // need to create more job
submissions
- // int numOfMaxWallTimeJobs =
((int) Math.floor(userGivenWallTime /
- // maxRunTime));
- // for (int i = 1; i <=
numOfMaxWallTimeJobs; i++) {
- // taskIdList.addAll(
- //
createAndSaveSubmissionTasks(registryClient, gatewayId,
- // preferredJobSubmissionInterface, processModel, maxRunTime));
- // }
- // int leftWallTime =
userGivenWallTime % maxRunTime;
- // if (leftWallTime != 0) {
- // taskIdList.addAll(
- //
createAndSaveSubmissionTasks(registryClient, gatewayId,
- // preferredJobSubmissionInterface, processModel,
leftWallTime));
- // }
- // } else {
- // taskIdList.addAll(
- //
createAndSaveSubmissionTasks(registryClient, gatewayId,
- // preferredJobSubmissionInterface, processModel,
userGivenWallTime));
- // }
- // }
- // }
- // } else {
- taskIdList.addAll(createAndSaveSubmissionTasks(
- registryClient, gatewayId,
preferredJobSubmissionInterface, processModel, userGivenWallTime));
- // }
-
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId));
+ taskIdList.addAll(createAndSaveEnvSetupTask(registryClient,
gatewayId, processModel, resourceType));
+
taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId,
resourceType));
+ taskIdList.addAll(createAndSaveSubmissionTasks(registryClient,
preferredJobSubmissionInterface, processModel, userGivenWallTime));
+
taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId,
resourceType));
}
// update process scheduling
registryClient.updateProcess(processModel,
processModel.getProcessId());
@@ -372,13 +360,12 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
}
}
- public String createAndSaveIntermediateOutputFetchingTasks(
- String gatewayId, ProcessModel processModel, ProcessModel
parentProcess) throws OrchestratorException {
+ public String createAndSaveIntermediateOutputFetchingTasks(String
gatewayId, ProcessModel processModel, ProcessModel parentProcess) throws
OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
try {
- List<String> taskIdList = new ArrayList<>();
-
-
taskIdList.addAll(createAndSaveIntermediateOutputDataStagingTasks(processModel,
gatewayId, parentProcess));
+ GroupComputeResourcePreference preference =
OrchestratorUtils.getGroupComputeResourcePreference(processModel);
+ ResourceType resourceType = preference.getResourceType();
+ List<String> taskIdList = new
ArrayList<>(createAndSaveIntermediateOutputDataStagingTasks(processModel,
gatewayId, parentProcess, resourceType));
// update process scheduling
registryClient.updateProcess(processModel,
processModel.getProcessId());
return getTaskDag(taskIdList);
@@ -403,33 +390,40 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
return dag.substring(0, dag.length() - 1); // remove last comma
}
- private List<String> createAndSaveEnvSetupTask(
- RegistryService.Client registryClient, String gatewayId,
ProcessModel processModel)
+ private List<String> createAndSaveEnvSetupTask(RegistryService.Client
registryClient, String gatewayId, ProcessModel processModel, ResourceType
resourceType)
throws TException, AiravataException, OrchestratorException {
List<String> envTaskIds = new ArrayList<>();
+
TaskModel envSetupTask = new TaskModel();
envSetupTask.setTaskType(TaskTypes.ENV_SETUP);
- envSetupTask.setTaskStatuses(Arrays.asList(new
TaskStatus(TaskState.CREATED)));
+ envSetupTask.setTaskStatuses(List.of(new
TaskStatus(TaskState.CREATED)));
envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
envSetupTask.setLastUpdateTime(AiravataUtils.getCurrentTimestamp().getTime());
envSetupTask.setParentProcessId(processModel.getProcessId());
+
EnvironmentSetupTaskModel envSetupSubModel = new
EnvironmentSetupTaskModel();
-
envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(processModel,
gatewayId));
- String scratchLocation =
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
- String workingDir = scratchLocation + File.separator +
processModel.getProcessId();
- envSetupSubModel.setLocation(workingDir);
+
envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(processModel,
gatewayId)); // TODO support for CLOUD (AWS)
+
+ if (resourceType == ResourceType.SLURM) {
+ String scratchLocation =
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
+ String workingDir = scratchLocation + File.separator +
processModel.getProcessId();
+ envSetupSubModel.setLocation(workingDir);
+ } else if (resourceType == ResourceType.AWS) {
+ envSetupSubModel.setLocation(File.separator + "tmp" +
File.separator + processModel.getProcessId());
+ }
+
byte[] envSetupSub =
ThriftUtils.serializeThriftObject(envSetupSubModel);
envSetupTask.setSubTaskModel(envSetupSub);
envSetupTask.setMaxRetry(3);
envSetupTask.setCurrentRetry(0);
- String envSetupTaskId = (String) registryClient.addTask(envSetupTask,
processModel.getProcessId());
+ String envSetupTaskId = registryClient.addTask(envSetupTask,
processModel.getProcessId());
envSetupTask.setTaskId(envSetupTaskId);
envTaskIds.add(envSetupTaskId);
+
return envTaskIds;
}
- public List<String> createAndSaveInputDataStagingTasks(ProcessModel
processModel, String gatewayId)
- throws AiravataException, OrchestratorException {
+ public List<String> createAndSaveInputDataStagingTasks(ProcessModel
processModel, String gatewayId, ResourceType resourceType) throws
AiravataException {
List<String> dataStagingTaskIds = new ArrayList<>();
List<InputDataObjectType> processInputs =
processModel.getProcessInputs();
@@ -445,18 +439,13 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
break;
case URI:
case URI_COLLECTION:
- if ((processInput.getValue() == null
- || processInput.getValue().equals(""))
- && !processInput.isIsRequired()) {
- logger.debug(
- "Skipping input data staging task for {}
since value is empty and not required",
- processInput.getName());
+ if ((processInput.getValue() == null ||
processInput.getValue().isEmpty()) && !processInput.isIsRequired()) {
+ logger.debug("Skipping input data staging task for
{} since value is empty and not required", processInput.getName());
break;
}
final RegistryService.Client registryClient =
getRegistryServiceClient();
try {
- TaskModel inputDataStagingTask =
- getInputDataStagingTask(registryClient,
processModel, processInput, gatewayId);
+ TaskModel inputDataStagingTask =
getInputDataStagingTask(registryClient, processModel, processInput, gatewayId,
resourceType);
String taskId =
registryClient.addTask(inputDataStagingTask, processModel.getProcessId());
inputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(inputDataStagingTask.getTaskId());
@@ -477,8 +466,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
return dataStagingTaskIds;
}
- public List<String> createAndSaveOutputDataStagingTasks(ProcessModel
processModel, String gatewayId)
- throws AiravataException, TException, OrchestratorException {
+ public List<String> createAndSaveOutputDataStagingTasks(ProcessModel
processModel, String gatewayId,
+ ResourceType
resourceType) throws AiravataException, TException, OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
List<String> dataStagingTaskIds = new ArrayList<>();
@@ -490,25 +479,20 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
DataType type = processOutput.getType();
switch (type) {
case STDOUT:
- if (null == processOutput.getValue()
- ||
processOutput.getValue().trim().isEmpty()) {
+ if (null == processOutput.getValue() ||
processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stdout");
}
- createOutputDataSatagingTasks(
- registryClient, processModel, gatewayId,
dataStagingTaskIds, processOutput);
+ createOutputDataSatagingTasks(registryClient,
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
break;
case STDERR:
- if (null == processOutput.getValue()
- ||
processOutput.getValue().trim().isEmpty()) {
+ if (null == processOutput.getValue() ||
processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stderr");
}
- createOutputDataSatagingTasks(
- registryClient, processModel, gatewayId,
dataStagingTaskIds, processOutput);
+ createOutputDataSatagingTasks(registryClient,
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
break;
case URI:
case URI_COLLECTION:
- createOutputDataSatagingTasks(
- registryClient, processModel, gatewayId,
dataStagingTaskIds, processOutput);
+ createOutputDataSatagingTasks(registryClient,
processModel, gatewayId, dataStagingTaskIds, processOutput, resourceType);
break;
default:
// nothing to do
@@ -518,8 +502,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
}
try {
- if (isArchive(registryClient, processModel,
orchestratorContext)) {
- createArchiveDataStatgingTask(registryClient,
processModel, gatewayId, dataStagingTaskIds);
+ if (isArchive(registryClient, processModel)) {
+ createArchiveDataStatgingTask(registryClient,
processModel, gatewayId, dataStagingTaskIds, resourceType);
}
} catch (Exception e) {
throw new AiravataException("Error! Application interface
retrieval failed", e);
@@ -532,9 +516,8 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
return dataStagingTaskIds;
}
- public List<String> createAndSaveIntermediateOutputDataStagingTasks(
- ProcessModel processModel, String gatewayId, ProcessModel
parentProcess)
- throws AiravataException, TException, OrchestratorException {
+ public List<String>
createAndSaveIntermediateOutputDataStagingTasks(ProcessModel processModel,
String gatewayId, ProcessModel parentProcess,
+
ResourceType resourceType) throws AiravataException, TException,
OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
List<String> dataStagingTaskIds = new ArrayList<>();
@@ -546,40 +529,20 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
DataType type = processOutput.getType();
switch (type) {
case STDOUT:
- if (null == processOutput.getValue()
- ||
processOutput.getValue().trim().isEmpty()) {
+ if (null == processOutput.getValue() ||
processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stdout");
}
- createIntermediateOutputDataStagingTasks(
- registryClient,
- processModel,
- gatewayId,
- parentProcess,
- dataStagingTaskIds,
- processOutput);
+
createIntermediateOutputDataStagingTasks(registryClient, processModel,
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
break;
case STDERR:
- if (null == processOutput.getValue()
- ||
processOutput.getValue().trim().isEmpty()) {
+ if (null == processOutput.getValue() ||
processOutput.getValue().trim().isEmpty()) {
processOutput.setValue(appName + ".stderr");
}
- createIntermediateOutputDataStagingTasks(
- registryClient,
- processModel,
- gatewayId,
- parentProcess,
- dataStagingTaskIds,
- processOutput);
+
createIntermediateOutputDataStagingTasks(registryClient, processModel,
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
break;
case URI:
case URI_COLLECTION:
- createIntermediateOutputDataStagingTasks(
- registryClient,
- processModel,
- gatewayId,
- parentProcess,
- dataStagingTaskIds,
- processOutput);
+
createIntermediateOutputDataStagingTasks(registryClient, processModel,
gatewayId, parentProcess, dataStagingTaskIds, processOutput, resourceType);
break;
default:
// nothing to do
@@ -596,23 +559,16 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
return dataStagingTaskIds;
}
- private boolean isArchive(
- RegistryService.Client registryClient, ProcessModel processModel,
OrchestratorContext orchestratorContext)
- throws TException {
- ApplicationInterfaceDescription appInterface =
-
registryClient.getApplicationInterface(processModel.getApplicationInterfaceId());
+ private boolean isArchive(RegistryService.Client registryClient,
ProcessModel processModel) throws TException {
+ ApplicationInterfaceDescription appInterface =
registryClient.getApplicationInterface(processModel.getApplicationInterfaceId());
return appInterface.isArchiveWorkingDirectory();
}
- private void createArchiveDataStatgingTask(
- RegistryService.Client registryClient,
- ProcessModel processModel,
- String gatewayId,
- List<String> dataStagingTaskIds)
- throws AiravataException, TException, OrchestratorException {
- TaskModel archiveTask = null;
+ private void createArchiveDataStatgingTask(RegistryService.Client
registryClient, ProcessModel processModel, String gatewayId,
+ List<String>
dataStagingTaskIds, ResourceType resourceType) throws AiravataException,
TException, OrchestratorException {
+ TaskModel archiveTask;
try {
- archiveTask = getOutputDataStagingTask(registryClient,
processModel, null, gatewayId, null);
+ archiveTask = getOutputDataStagingTask(registryClient,
processModel, null, gatewayId, null, resourceType);
} catch (TException e) {
throw new AiravataException("Error! DataStaging sub task
serialization failed", e);
}
@@ -621,16 +577,10 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
dataStagingTaskIds.add(archiveTask.getTaskId());
}
- private void createOutputDataSatagingTasks(
- RegistryService.Client registryClient,
- ProcessModel processModel,
- String gatewayId,
- List<String> dataStagingTaskIds,
- OutputDataObjectType processOutput)
- throws AiravataException, OrchestratorException {
+ private void createOutputDataSatagingTasks(RegistryService.Client
registryClient, ProcessModel processModel, String gatewayId,
+ List<String>
dataStagingTaskIds, OutputDataObjectType processOutput, ResourceType
resourceType) throws AiravataException, OrchestratorException {
try {
- TaskModel outputDataStagingTask =
- getOutputDataStagingTask(registryClient, processModel,
processOutput, gatewayId, null);
+ TaskModel outputDataStagingTask =
getOutputDataStagingTask(registryClient, processModel, processOutput,
gatewayId, null, resourceType);
String taskId = registryClient.addTask(outputDataStagingTask,
processModel.getProcessId());
outputDataStagingTask.setTaskId(taskId);
dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
@@ -639,17 +589,11 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
}
}
- private void createIntermediateOutputDataStagingTasks(
- RegistryService.Client registryClient,
- ProcessModel processModel,
- String gatewayId,
- ProcessModel parentProcess,
- List<String> dataStagingTaskIds,
- OutputDataObjectType processOutput)
- throws AiravataException, OrchestratorException {
+ private void
createIntermediateOutputDataStagingTasks(RegistryService.Client registryClient,
ProcessModel processModel, String gatewayId,
+ ProcessModel
parentProcess, List<String> dataStagingTaskIds,
+ OutputDataObjectType
processOutput, ResourceType resourceType) throws AiravataException,
OrchestratorException {
try {
- TaskModel outputDataStagingTask =
- getOutputDataStagingTask(registryClient, processModel,
processOutput, gatewayId, parentProcess);
+ TaskModel outputDataStagingTask =
getOutputDataStagingTask(registryClient, processModel, processOutput,
gatewayId, parentProcess, resourceType);
outputDataStagingTask.setTaskType(TaskTypes.OUTPUT_FETCHING);
String taskId = registryClient.addTask(outputDataStagingTask,
processModel.getProcessId());
outputDataStagingTask.setTaskId(taskId);
@@ -659,20 +603,14 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
}
}
- private List<String> createAndSaveSubmissionTasks(
- RegistryService.Client registryClient,
- String gatewayId,
- JobSubmissionInterface jobSubmissionInterface,
- ProcessModel processModel,
- int wallTime)
+ private List<String> createAndSaveSubmissionTasks(RegistryService.Client
registryClient, JobSubmissionInterface jobSubmissionInterface, ProcessModel
processModel, int wallTime)
throws TException, OrchestratorException {
JobSubmissionProtocol jobSubmissionProtocol =
jobSubmissionInterface.getJobSubmissionProtocol();
- MonitorMode monitorMode = null;
- if (jobSubmissionProtocol == JobSubmissionProtocol.SSH
- || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
- SSHJobSubmission sshJobSubmission =
-
OrchestratorUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ MonitorMode monitorMode;
+
+ if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+ SSHJobSubmission sshJobSubmission =
OrchestratorUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
monitorMode = sshJobSubmission.getMonitorMode();
} else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
monitorMode = MonitorMode.FORK;
@@ -688,42 +626,50 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
jobSubmissionProtocol.name());
throw new OrchestratorException("Unsupported Job Submission
Protocol " + jobSubmissionProtocol.name());
}
+
List<String> submissionTaskIds = new ArrayList<>();
- TaskModel taskModel = new TaskModel();
- taskModel.setParentProcessId(processModel.getProcessId());
- taskModel.setCreationTime(System.currentTimeMillis());
- taskModel.setLastUpdateTime(taskModel.getCreationTime());
TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setTaskStatuses(Arrays.asList(taskStatus));
- taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+
JobSubmissionTaskModel submissionSubTask = new
JobSubmissionTaskModel();
submissionSubTask.setMonitorMode(monitorMode);
submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol);
submissionSubTask.setWallTime(wallTime);
+
byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
+
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(System.currentTimeMillis());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ taskModel.setTaskStatuses(List.of(taskStatus));
+ taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
taskModel.setSubTaskModel(bytes);
taskModel.setMaxRetry(1);
taskModel.setCurrentRetry(0);
+
String taskId = registryClient.addTask(taskModel,
processModel.getProcessId());
taskModel.setTaskId(taskId);
submissionTaskIds.add(taskModel.getTaskId());
// create monitor task for this Email based monitor mode job
if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR ||
monitorMode == MonitorMode.CLOUD_JOB_MONITOR) {
+
+ TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
+
monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
TaskModel monitorTaskModel = new TaskModel();
monitorTaskModel.setParentProcessId(processModel.getProcessId());
monitorTaskModel.setCreationTime(System.currentTimeMillis());
monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime());
- TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED);
- monitorTaskStatus.setTimeOfStateChange(
- AiravataUtils.getCurrentTimestamp().getTime());
- monitorTaskModel.setTaskStatuses(Arrays.asList(monitorTaskStatus));
+ monitorTaskModel.setTaskStatuses(List.of(monitorTaskStatus));
monitorTaskModel.setTaskType(TaskTypes.MONITORING);
+
MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel();
monitorSubTaskModel.setMonitorMode(monitorMode);
monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel));
- String mTaskId = (String) registryClient.addTask(monitorTaskModel,
processModel.getProcessId());
+
+ String mTaskId = registryClient.addTask(monitorTaskModel,
processModel.getProcessId());
monitorTaskModel.setTaskId(mTaskId);
submissionTaskIds.add(monitorTaskModel.getTaskId());
}
@@ -740,11 +686,7 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
});
}
- private TaskModel getInputDataStagingTask(
- RegistryService.Client registryClient,
- ProcessModel processModel,
- InputDataObjectType processInput,
- String gatewayId)
+ private TaskModel getInputDataStagingTask(RegistryService.Client
registryClient, ProcessModel processModel, InputDataObjectType processInput,
String gatewayId, ResourceType resourceType)
throws TException, AiravataException, OrchestratorException {
// create new task model for this task
TaskModel taskModel = new TaskModel();
@@ -757,103 +699,130 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
taskModel.setTaskType(TaskTypes.DATA_STAGING);
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
- ComputeResourceDescription computeResource =
-
registryClient.getComputeResource(processModel.getComputeResourceId());
- String scratchLocation =
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
- String workingDir =
- (scratchLocation.endsWith(File.separator) ? scratchLocation :
scratchLocation + File.separator)
- + processModel.getProcessId()
- + File.separator;
- URI destination = null;
- try {
- DataMovementProtocol dataMovementProtocol =
-
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
- String loginUserName =
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
- StringBuilder destinationPath = new StringBuilder(workingDir);
- Optional.ofNullable(processInput.getOverrideFilename())
- .ifPresent(destinationPath::append); // If an override
filename is provided
-
- destination = new URI(
- dataMovementProtocol.name(),
- loginUserName,
- computeResource.getHostName(),
- OrchestratorUtils.getDataMovementPort(processModel,
gatewayId),
- destinationPath.toString(),
- null,
- null);
- } catch (URISyntaxException e) {
- throw new OrchestratorException("Error while constructing
destination file URI", e);
+ ComputeResourceDescription computeResource =
registryClient.getComputeResource(processModel.getComputeResourceId());
+
+ String destinationUriString = "";
+
+ if (resourceType == ResourceType.SLURM) {
+ String scratchLocation =
OrchestratorUtils.getScratchLocation(processModel, gatewayId);
+ String workingDir = (scratchLocation.endsWith(File.separator) ?
scratchLocation : scratchLocation + File.separator) +
processModel.getProcessId() + File.separator;
+
+ URI destination;
+ try {
+ DataMovementProtocol dataMovementProtocol =
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
+ String loginUserName =
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+ StringBuilder destinationPath = new StringBuilder(workingDir);
+
Optional.ofNullable(processInput.getOverrideFilename()).ifPresent(destinationPath::append);
// If an override filename is provided
+
+ destination = new URI(
+ dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+ OrchestratorUtils.getDataMovementPort(processModel,
gatewayId),
+ destinationPath.toString(),
+ null,
+ null);
+ destinationUriString = destination.toString();
+
+ } catch (URISyntaxException e) {
+ throw new OrchestratorException("Error while constructing
destination file URI", e);
+ }
+
+ } else if (resourceType == ResourceType.AWS) {
+ logger.info("Configuring AWS Input Data Staging task for process
{}. Setting dummy S3 destination.", processModel.getProcessId());
+
+ try {
+ String fileName = processInput.getOverrideFilename() != null
&& !processInput.getOverrideFilename().isEmpty()
+ ? processInput.getOverrideFilename()
+ : new File(new
URI(processInput.getValue()).getPath()).getName();
+ destinationUriString = "s3://dummy-bucket/" +
processModel.getProcessId() + "/" + fileName;
+
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
}
+
+ submodel.setDestination(destinationUriString);
submodel.setType(DataStageType.INPUT);
submodel.setSource(processInput.getValue());
submodel.setProcessInput(processInput);
- submodel.setDestination(destination.toString());
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
taskModel.setMaxRetry(3);
taskModel.setCurrentRetry(0);
return taskModel;
}
- private TaskModel getOutputDataStagingTask(
- RegistryService.Client registryClient,
- ProcessModel processModel,
- OutputDataObjectType processOutput,
- String gatewayId,
- ProcessModel parentProcess)
- throws TException, AiravataException, OrchestratorException {
+ private TaskModel getOutputDataStagingTask(RegistryService.Client
registryClient, ProcessModel processModel, OutputDataObjectType processOutput,
+ String gatewayId, ProcessModel
parentProcess, ResourceType resourceType) throws TException, AiravataException,
OrchestratorException {
try {
-
// create new task model for this task
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+
TaskModel taskModel = new TaskModel();
taskModel.setParentProcessId(processModel.getProcessId());
taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
taskModel.setLastUpdateTime(taskModel.getCreationTime());
- TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
-
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskModel.setTaskStatuses(Arrays.asList(taskStatus));
+ taskModel.setTaskStatuses(List.of(taskStatus));
taskModel.setTaskType(TaskTypes.DATA_STAGING);
- ComputeResourceDescription computeResource =
-
registryClient.getComputeResource(processModel.getComputeResourceId());
- String workingDir =
OrchestratorUtils.getScratchLocation(processModel, gatewayId)
- + File.separator
- + (parentProcess == null ? processModel.getProcessId() :
parentProcess.getProcessId())
- + File.separator;
+ ComputeResourceDescription computeResource =
registryClient.getComputeResource(processModel.getComputeResourceId());
DataStagingTaskModel submodel = new DataStagingTaskModel();
- DataMovementProtocol dataMovementProtocol =
-
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
- URI source = null;
- try {
- String loginUserName =
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+ String sourceUriString = "";
+
+ if (resourceType == ResourceType.SLURM) {
+ String workingDir =
OrchestratorUtils.getScratchLocation(processModel, gatewayId)
+ + File.separator
+ + (parentProcess == null ? processModel.getProcessId()
: parentProcess.getProcessId())
+ + File.separator;
+ DataMovementProtocol dataMovementProtocol =
OrchestratorUtils.getPreferredDataMovementProtocol(processModel, gatewayId);
+ URI source;
+ try {
+ String loginUserName =
OrchestratorUtils.getLoginUserName(processModel, gatewayId);
+ if (processOutput != null) {
+ submodel.setType(DataStageType.OUPUT);
+ submodel.setProcessOutput(processOutput);
+ source = new URI(
+ dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
+ workingDir + processOutput.getValue(),
+ null,
+ null);
+ } else {
+ // archive
+ submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+ source = new URI(
+ dataMovementProtocol.name(),
+ loginUserName,
+ computeResource.getHostName(),
+
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
+ workingDir,
+ null,
+ null);
+ }
+
+ } catch (URISyntaxException e) {
+ throw new OrchestratorException("Error while constructing
source file URI", e);
+ }
+ sourceUriString = source.toString();
+
+ } else if (resourceType == ResourceType.AWS) {
+ logger.info("Configuring AWS Output Data Staging task for
process {}. Setting dummy S3 source.", processModel.getProcessId());
if (processOutput != null) {
submodel.setType(DataStageType.OUPUT);
submodel.setProcessOutput(processOutput);
- source = new URI(
- dataMovementProtocol.name(),
- loginUserName,
- computeResource.getHostName(),
-
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
- workingDir + processOutput.getValue(),
- null,
- null);
+ sourceUriString = "s3://dummy-bucket/" +
processModel.getProcessId() + "/" + processOutput.getValue();
+
} else {
- // archive
submodel.setType(DataStageType.ARCHIVE_OUTPUT);
- source = new URI(
- dataMovementProtocol.name(),
- loginUserName,
- computeResource.getHostName(),
-
OrchestratorUtils.getDataMovementPort(processModel, gatewayId),
- workingDir,
- null,
- null);
+ sourceUriString = "s3://dummy-bucket/" +
processModel.getProcessId() + "/";
}
- } catch (URISyntaxException e) {
- throw new OrchestratorException("Error while constructing
source file URI", e);
}
- // We don't know destination location at this time, data staging
task will set this.
- // because destination is required field we set dummy destination
- submodel.setSource(source.toString());
+
+ submodel.setSource(sourceUriString);
// We don't know destination location at this time, data staging
task will set this.
// because destination is required field we set dummy destination
submodel.setDestination("dummy://temp/file/location");
@@ -861,6 +830,7 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator {
taskModel.setMaxRetry(3);
taskModel.setCurrentRetry(0);
return taskModel;
+
} catch (OrchestratorException e) {
throw new OrchestratorException("Error occurred while retrieving
data movement from app catalog", e);
}
diff --git
a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
index b7675f7518..113f7a4a66 100644
---
a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
+++
b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/appcatalog/AWSGroupComputeResourcePrefEntity.java
@@ -19,6 +19,7 @@
package org.apache.airavata.registry.core.entities.appcatalog;
import jakarta.persistence.Column;
+import jakarta.persistence.DiscriminatorValue;
import jakarta.persistence.Entity;
import jakarta.persistence.PrimaryKeyJoinColumn;
import jakarta.persistence.PrimaryKeyJoinColumns;
@@ -28,6 +29,7 @@ import jakarta.persistence.Table;
* The persistent class for the aws_group_compute_resource_preference database
table.
*/
@Entity
+@DiscriminatorValue("AWS")
@Table(name = "AWS_GROUP_COMPUTE_RESOURCE_PREFERENCE")
@PrimaryKeyJoinColumns({
@PrimaryKeyJoinColumn(name = "RESOURCE_ID", referencedColumnName =
"RESOURCE_ID"),
diff --git
a/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
b/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
index 357d3fbe61..f82983631c 100644
---
a/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
+++
b/thrift-interface-descriptions/data-models/group_resource_profile_model.thrift
@@ -60,12 +60,9 @@ struct SlurmComputeResourcePreference {
}
struct AwsComputeResourcePreference {
- 1: optional string preferredAmiId,
- 2: optional string preferredInstanceType,
- 3: optional string region,
- 4: optional string securityGroupId,
- 5: optional string keyPairName,
- 6: optional i64 maxStartupTime,
+ 1: optional string region,
+ 2: optional string preferredAmiId,
+ 3: optional string preferredInstanceType,
}
union EnvironmentSpecificPreferences {