This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6560baa22c enhance MultiComputeResourcePolicy and orchestrator to 
select best CR
     new a2d7b2d0b1 Merge pull request #394 from isururanawaka/metaschedular
6560baa22c is described below

commit 6560baa22cd34cde2e5c38ef59fb0a2ee4c38752
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Thu Mar 9 08:54:50 2023 -0500

    enhance MultiComputeResourcePolicy and orchestrator to select best CR
---
 .../scheduling/api/ProcessSchedulerImpl.java       | 21 ++++++++++++++++++++-
 .../selection/MultipleComputeResourcePolicy.java   | 21 ---------------------
 .../cpi/impl/SimpleOrchestratorImpl.java           |  2 +-
 .../server/OrchestratorServerHandler.java          | 22 ++++++++++++++++++----
 4 files changed, 39 insertions(+), 27 deletions(-)

diff --git 
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
 
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
index 409998d4ea..4ac44ed705 100644
--- 
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
+++ 
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/api/ProcessSchedulerImpl.java
@@ -5,6 +5,8 @@ import org.apache.airavata.common.utils.ThriftClientPool;
 import org.apache.airavata.metascheduler.core.api.ProcessScheduler;
 import 
org.apache.airavata.metascheduler.core.engine.ComputeResourceSelectionPolicy;
 import org.apache.airavata.metascheduler.core.utils.Utils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.process.ProcessModel;
 import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.ProcessState;
@@ -39,6 +41,8 @@ public class ProcessSchedulerImpl implements ProcessScheduler 
{
         final RegistryService.Client registryClient = 
this.registryClientPool.getResource();
         try {
             List<ProcessModel> processModels = 
registryClient.getProcessList(experimentId);
+
+            ExperimentModel experiment = 
registryClient.getExperiment(experimentId);
             boolean allProcessesScheduled = true;
 
             String selectionPolicyClass = 
ServerSettings.getComputeResourceSelectionPolicyClass();
@@ -53,8 +57,23 @@ public class ProcessSchedulerImpl implements 
ProcessScheduler {
                             selectComputeResource(processModel.getProcessId());
 
                     if (computationalResourceSchedulingModel.isPresent()) {
-                        
processModel.setProcessResourceSchedule(computationalResourceSchedulingModel.get());
+                        ComputationalResourceSchedulingModel 
resourceSchedulingModel = computationalResourceSchedulingModel.get();
+                        List<InputDataObjectType> inputDataObjectTypeList =  
experiment.getExperimentInputs();
+                        inputDataObjectTypeList.forEach(obj->{
+                            if (obj.getName().equals("Wall_Time")){
+                                
obj.setValue("-walltime="+resourceSchedulingModel.getWallTimeLimit());
+                            }
+                            if (obj.getName().equals("Parallel_Group_Count")){
+                                
obj.setValue("-mgroupcount="+resourceSchedulingModel.getMGroupCount());
+                            }
+                        });
+
+                        
experiment.setExperimentInputs(inputDataObjectTypeList);
+                        
processModel.setProcessResourceSchedule(resourceSchedulingModel);
+                        
processModel.setComputeResourceId(resourceSchedulingModel.getResourceHostId());
+
                         registryClient.updateProcess(processModel, 
processModel.getProcessId());
+                        
registryClient.updateExperiment(processModel.getExperimentId(),experiment);
                     } else {
                         ProcessStatus newProcessStatus = new ProcessStatus();
                         newProcessStatus.setState(ProcessState.QUEUED);
diff --git 
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
 
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
index 8e6c4e8fb0..449613a68f 100644
--- 
a/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
+++ 
b/modules/airavata-metascheduler/process-scheduler/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/cr/selection/MultipleComputeResourcePolicy.java
@@ -44,22 +44,14 @@ public class MultipleComputeResourcePolicy extends 
DefaultComputeResourceSelecti
                 ExperimentModel experiment = 
registryClient.getExperiment(processModel.getExperimentId());
 
 
-
-
                 UserConfigurationDataModel userConfigurationDataModel = 
experiment.getUserConfigurationData();
 
-                // Assume scheduling data is populated in 
USER_CONFIGURATION_DATA_MODEL
-                ComputationalResourceSchedulingModel 
computationalResourceSchedulingModel = userConfigurationDataModel
-                        .getComputationalResourceScheduling();
-
 
                 List<ComputationalResourceSchedulingModel> 
resourceSchedulingModels =
                         
userConfigurationDataModel.getAutoScheduledCompResourceSchedulingList();
 
                 List<String> retries = new ArrayList<>();
 
-
-
                 while (retries.size()<resourceSchedulingModels.size()) {
                     Random rand = new Random();
                     int upperbound = resourceSchedulingModels.size();
@@ -71,19 +63,6 @@ public class MultipleComputeResourcePolicy extends 
DefaultComputeResourceSelecti
                         QueueStatusModel queueStatusModel = 
registryClient.getQueueStatus(comResourceDes.getHostName(),
                                 resourceSchedulingModel.getQueueName());
                         if (queueStatusModel.isQueueUp()) {
-
-                           List<InputDataObjectType> inputDataObjectTypeList = 
 experiment.getExperimentInputs();
-                           inputDataObjectTypeList.forEach(obj->{
-                               if (obj.getName().equals("Wall_Time")){
-                                   
obj.setValue("-walltime="+resourceSchedulingModel.getWallTimeLimit());
-                               }
-                               if 
(obj.getName().equals("Parallel_Group_Count")){
-                                   
obj.setValue("-mgroupcount="+resourceSchedulingModel.getMGroupCount());
-                               }
-                           });
-
-
-
                             return Optional.of(resourceSchedulingModel);
                         }else{
                             retries.add(key);
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 35dc11b829..7c086f0b90 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -300,7 +300,7 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         }
     }
 
-    public String createAndSaveTasks(String gatewayId, ProcessModel 
processModel, boolean autoSchedule) throws OrchestratorException {
+    public String createAndSaveTasks(String gatewayId, ProcessModel 
processModel) throws OrchestratorException {
         final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
             ComputationalResourceSchedulingModel resourceSchedule = 
processModel.getProcessResourceSchedule();
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 7acb6a6bd1..4d2f0fda7e 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -238,17 +238,31 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                         }
                     });
 
-                    String taskDag = 
orchestrator.createAndSaveTasks(gatewayId, processModel, 
experiment.getUserConfigurationData().isAiravataAutoSchedule());
-                    processModel.setTaskDag(taskDag);
-                    registryClient.updateProcess(processModel, 
processModel.getProcessId());
+
+
+                    if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+                        String taskDag = 
orchestrator.createAndSaveTasks(gatewayId, processModel);
+                        processModel.setTaskDag(taskDag);
+                        registryClient.updateProcess(processModel, 
processModel.getProcessId());
+                    }
                 }
 
-                if (!validateProcess(experimentId, processes)) {
+                if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule() && 
!validateProcess(experimentId, processes)) {
                     throw new Exception("Validating process fails for given 
experiment Id : " + experimentId);
                 }
 
                 ProcessScheduler scheduler = new ProcessSchedulerImpl();
                 if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule() || 
scheduler.canLaunch(experimentId)) {
+                    if 
(experiment.getUserConfigurationData().isAiravataAutoSchedule()){
+                        for (ProcessModel processModel : processes) {
+                            String taskDag = 
orchestrator.createAndSaveTasks(gatewayId, processModel);
+                            processModel.setTaskDag(taskDag);
+                            registryClient.updateProcess(processModel, 
processModel.getProcessId());
+                        }
+                        if (!validateProcess(experimentId, processes)) {
+                            throw new Exception("Validating process fails for 
given experiment Id : " + experimentId);
+                        }
+                    }
                     runExperimentLauncher(experimentId, token, gatewayId);
                 } else {
                     log.debug(experimentId, "Queuing single application 
experiment {}.", experimentId);

Reply via email to