This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 6560baa22c enhance MultiComputeResourcePolicy and orchestrator to
select best CR
new a2d7b2d0b1 Merge pull request #394 from isururanawaka/metaschedular
6560baa22c is described below
commit 6560baa22cd34cde2e5c38ef59fb0a2ee4c38752
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Thu Mar 9 08:54:50 2023 -0500
enhance MultiComputeResourcePolicy and orchestrator to select best CR
---
.../scheduling/api/ProcessSchedulerImpl.java | 21 ++++++++++++++++++++-
.../selection/MultipleComputeResourcePolicy.java | 21 ---------------------
.../cpi/impl/SimpleOrchestratorImpl.java | 2 +-
.../server/OrchestratorServerHandler.java | 22 ++++++++++++++++++----
4 files changed, 39 insertions(+), 27 deletions(-)
diff --git
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
index 409998d4ea..4ac44ed705 100644
---
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
+++
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
@@ -5,6 +5,8 @@ import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.metascheduler.core.api.ProcessScheduler;
import
org.apache.airavata.metascheduler.core.engine.ComputeResourceSelectionPolicy;
import org.apache.airavata.metascheduler.core.utils.Utils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.process.ProcessModel;
import
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.status.ProcessState;
@@ -39,6 +41,8 @@ public class ProcessSchedulerImpl implements ProcessScheduler
{
final RegistryService.Client registryClient =
this.registryClientPool.getResource();
try {
List<ProcessModel> processModels =
registryClient.getProcessList(experimentId);
+
+ ExperimentModel experiment =
registryClient.getExperiment(experimentId);
boolean allProcessesScheduled = true;
String selectionPolicyClass =
ServerSettings.getComputeResourceSelectionPolicyClass();
@@ -53,8 +57,23 @@ public class ProcessSchedulerImpl implements
ProcessScheduler {
selectComputeResource(processModel.getProcessId());
if (computationalResourceSchedulingModel.isPresent()) {
-
processModel.setProcessResourceSchedule(computationalResourceSchedulingModel.get());
+ ComputationalResourceSchedulingModel
resourceSchedulingModel = computationalResourceSchedulingModel.get();
+ List<InputDataObjectType> inputDataObjectTypeList =
experiment.getExperimentInputs();
+ inputDataObjectTypeList.forEach(obj->{
+ if (obj.getName().equals("Wall_Time")){
+
obj.setValue("-walltime="+resourceSchedulingModel.getWallTimeLimit());
+ }
+ if (obj.getName().equals("Parallel_Group_Count")){
+
obj.setValue("-mgroupcount="+resourceSchedulingModel.getMGroupCount());
+ }
+ });
+
+
experiment.setExperimentInputs(inputDataObjectTypeList);
+
processModel.setProcessResourceSchedule(resourceSchedulingModel);
+
processModel.setComputeResourceId(resourceSchedulingModel.getResourceHostId());
+
registryClient.updateProcess(processModel,
processModel.getProcessId());
+
registryClient.updateExperiment(processModel.getExperimentId(),experiment);
} else {
ProcessStatus newProcessStatus = new ProcessStatus();
newProcessStatus.setState(ProcessState.QUEUED);
diff --git
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
index 8e6c4e8fb0..449613a68f 100644
---
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
+++
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
@@ -44,22 +44,14 @@ public class MultipleComputeResourcePolicy extends
DefaultComputeResourceSelecti
ExperimentModel experiment =
registryClient.getExperiment(processModel.getExperimentId());
-
-
UserConfigurationDataModel userConfigurationDataModel =
experiment.getUserConfigurationData();
- // Assume scheduling data is populated in
USER_CONFIGURATION_DATA_MODEL
- ComputationalResourceSchedulingModel
computationalResourceSchedulingModel = userConfigurationDataModel
- .getComputationalResourceScheduling();
-
List<ComputationalResourceSchedulingModel>
resourceSchedulingModels =
userConfigurationDataModel.getAutoScheduledCompResourceSchedulingList();
List<String> retries = new ArrayList<>();
-
-
while (retries.size()<resourceSchedulingModels.size()) {
Random rand = new Random();
int upperbound = resourceSchedulingModels.size();
@@ -71,19 +63,6 @@ public class MultipleComputeResourcePolicy extends
DefaultComputeResourceSelecti
QueueStatusModel queueStatusModel =
registryClient.getQueueStatus(comResourceDes.getHostName(),
resourceSchedulingModel.getQueueName());
if (queueStatusModel.isQueueUp()) {
-
- List<InputDataObjectType> inputDataObjectTypeList =
experiment.getExperimentInputs();
- inputDataObjectTypeList.forEach(obj->{
- if (obj.getName().equals("Wall_Time")){
-
obj.setValue("-walltime="+resourceSchedulingModel.getWallTimeLimit());
- }
- if
(obj.getName().equals("Parallel_Group_Count")){
-
obj.setValue("-mgroupcount="+resourceSchedulingModel.getMGroupCount());
- }
- });
-
-
-
return Optional.of(resourceSchedulingModel);
}else{
retries.add(key);
diff --git
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 35dc11b829..7c086f0b90 100644
---
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -300,7 +300,7 @@ public class SimpleOrchestratorImpl extends
AbstractOrchestrator{
}
}
- public String createAndSaveTasks(String gatewayId, ProcessModel
processModel, boolean autoSchedule) throws OrchestratorException {
+ public String createAndSaveTasks(String gatewayId, ProcessModel
processModel) throws OrchestratorException {
final RegistryService.Client registryClient =
getRegistryServiceClient();
try {
ComputationalResourceSchedulingModel resourceSchedule =
processModel.getProcessResourceSchedule();
diff --git
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 7acb6a6bd1..4d2f0fda7e 100644
---
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -238,17 +238,31 @@ public class OrchestratorServerHandler implements
OrchestratorService.Iface {
}
});
- String taskDag =
orchestrator.createAndSaveTasks(gatewayId, processModel,
experiment.getUserConfigurationData().isAiravataAutoSchedule());
- processModel.setTaskDag(taskDag);
- registryClient.updateProcess(processModel,
processModel.getProcessId());
+
+
+ if
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+ String taskDag =
orchestrator.createAndSaveTasks(gatewayId, processModel);
+ processModel.setTaskDag(taskDag);
+ registryClient.updateProcess(processModel,
processModel.getProcessId());
+ }
}
- if (!validateProcess(experimentId, processes)) {
+ if
(!experiment.getUserConfigurationData().isAiravataAutoSchedule() &&
!validateProcess(experimentId, processes)) {
throw new Exception("Validating process fails for given
experiment Id : " + experimentId);
}
ProcessScheduler scheduler = new ProcessSchedulerImpl();
if
(!experiment.getUserConfigurationData().isAiravataAutoSchedule() ||
scheduler.canLaunch(experimentId)) {
+ if
(experiment.getUserConfigurationData().isAiravataAutoSchedule()){
+ for (ProcessModel processModel : processes) {
+ String taskDag =
orchestrator.createAndSaveTasks(gatewayId, processModel);
+ processModel.setTaskDag(taskDag);
+ registryClient.updateProcess(processModel,
processModel.getProcessId());
+ }
+ if (!validateProcess(experimentId, processes)) {
+ throw new Exception("Validating process fails for
given experiment Id : " + experimentId);
+ }
+ }
runExperimentLauncher(experimentId, token, gatewayId);
} else {
log.debug(experimentId, "Queuing single application
experiment {}.", experimentId);