This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch resource-mgmt-rest-api
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 6a66621b47d470efab6ab32b24f1d3b66133e0d8
Author: yasithdev <[email protected]>
AuthorDate: Mon Nov 10 18:40:37 2025 -0500

    update OrchestratorServerHandler
---
 .../server/OrchestratorServerHandler.java          | 802 ++-------------------
 .../airavata/service/OrchestratorService.java      | 737 +++++++++++++++++++
 2 files changed, 783 insertions(+), 756 deletions(-)

diff --git 
a/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e00a9d1c3c..6390572085 100644
--- 
a/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -60,9 +60,7 @@ import 
org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
 import 
org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
 import org.apache.airavata.orchestrator.util.OrchestratorUtils;
-import org.apache.airavata.registry.api.RegistryService;
-import org.apache.airavata.registry.api.RegistryService.Client;
-import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
+import org.apache.airavata.service.OrchestratorRegistryService;
 import org.apache.airavata.registry.api.exception.RegistryServiceException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
@@ -87,6 +85,8 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
     private final Subscriber experimentSubscriber;
 
     private CuratorFramework curatorClient;
+    private OrchestratorRegistryService orchestratorRegistryService = new 
OrchestratorRegistryService();
+    private org.apache.airavata.service.OrchestratorService 
orchestratorService;
 
     /**
      * Query orchestrator server to fetch the CPI version
@@ -106,10 +106,11 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 
             publisher = MessagingFactory.getPublisher(Type.STATUS);
             orchestrator.initialize();
-            orchestrator.getOrchestratorContext().setPublisher(this.publisher);
+            orchestrator.getOrchestratorContext().setPublisher(publisher);
             statusSubscribe = getStatusSubscriber();
             experimentSubscriber = getExperimentSubscriber();
             startCurator();
+            orchestratorService = new 
org.apache.airavata.service.OrchestratorService(orchestratorRegistryService, 
orchestrator, curatorClient, publisher);
         } catch (OrchestratorException | AiravataException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing 
orchestrator service", e);
@@ -139,178 +140,14 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
      * @param experimentId
      */
     public boolean launchExperiment(String experimentId, String gatewayId) 
throws TException {
-        ExperimentModel experiment = null;
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            // TODO deprecate this approach as we are replacing gfac
-            String experimentNodePath = getExperimentNodePath(experimentId);
-            ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentNodePath);
-            String experimentCancelNode =
-                    ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-            ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentCancelNode);
-            experiment = registryClient.getExperiment(experimentId);
-            if (experiment == null) {
-                throw new Exception("Error retrieving the Experiment by the 
given experimentID: " + experimentId);
-            }
-
-            UserConfigurationDataModel userConfigurationData = 
experiment.getUserConfigurationData();
-            String token = null;
-            final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
-            if (groupResourceProfileId == null) {
-                throw new Exception("Experiment not configured with a Group 
Resource Profile: " + experimentId);
-            }
-
-            if (userConfigurationData.getComputationalResourceScheduling() != 
null
-                    && userConfigurationData
-                            .getComputationalResourceScheduling()
-                            
.isSet(ComputationalResourceSchedulingModel._Fields.RESOURCE_HOST_ID)) {
-                GroupComputeResourcePreference groupComputeResourcePreference =
-                        registryClient.getGroupComputeResourcePreference(
-                                userConfigurationData
-                                        .getComputationalResourceScheduling()
-                                        .getResourceHostId(),
-                                groupResourceProfileId);
-
-                if 
(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != 
null) {
-                    token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
-                }
-            }
-            if (token == null || token.isEmpty()) {
-                // try with group resource profile level token
-                GroupResourceProfile groupResourceProfile =
-                        
registryClient.getGroupResourceProfile(groupResourceProfileId);
-                token = groupResourceProfile.getDefaultCredentialStoreToken();
-            }
-            // still the token is empty, then we fail the experiment
-            if (token == null || token.isEmpty()) {
-                throw new Exception(
-                        "You have not configured credential store token at 
group resource profile or compute resource preference."
-                                + " Please provide the correct token at group 
resource profile or compute resource preference.");
-            }
-            ExperimentType executionType = experiment.getExperimentType();
-            if (executionType == ExperimentType.SINGLE_APPLICATION) {
-                // its an single application execution experiment
-                List<ProcessModel> processes = 
orchestrator.createProcesses(experimentId, gatewayId);
-
-                for (ProcessModel processModel : processes) {
-                    // FIXME Resolving replica if available. This is a very 
crude way of resolving input replicas. A
-                    // full featured
-                    // FIXME replica resolving logic should come here
-                    processModel.getProcessInputs().stream().forEach(pi -> {
-                        if (pi.getType().equals(DataType.URI)
-                                && pi.getValue() != null
-                                && pi.getValue().startsWith("airavata-dp://")) 
{
-                            try {
-                                DataProductModel dataProductModel = 
registryClient.getDataProduct(pi.getValue());
-                                Optional<DataReplicaLocationModel> rpLocation =
-                                        
dataProductModel.getReplicaLocations().stream()
-                                                .filter(rpModel -> 
rpModel.getReplicaLocationCategory()
-                                                        
.equals(ReplicaLocationCategory.GATEWAY_DATA_STORE))
-                                                .findFirst();
-                                if (rpLocation.isPresent()) {
-                                    
pi.setValue(rpLocation.get().getFilePath());
-                                    
pi.setStorageResourceId(rpLocation.get().getStorageResourceId());
-                                } else {
-                                    log.error("Could not find a replica for 
the URI " + pi.getValue());
-                                }
-                            } catch (RegistryServiceException e) {
-                                throw new RuntimeException("Error while 
launching experiment", e);
-                            } catch (TException e) {
-                                throw new RuntimeException("Error while 
launching experiment", e);
-                            }
-                        } else if (pi.getType().equals(DataType.URI_COLLECTION)
-                                && pi.getValue() != null
-                                && pi.getValue().contains("airavata-dp://")) {
-                            try {
-                                String[] uriList = pi.getValue().split(",");
-                                final ArrayList<String> filePathList = new 
ArrayList<>();
-                                for (String uri : uriList) {
-                                    if (uri.startsWith("airavata-dp://")) {
-                                        DataProductModel dataProductModel = 
registryClient.getDataProduct(uri);
-                                        Optional<DataReplicaLocationModel> 
rpLocation =
-                                                
dataProductModel.getReplicaLocations().stream()
-                                                        .filter(rpModel -> 
rpModel.getReplicaLocationCategory()
-                                                                
.equals(ReplicaLocationCategory.GATEWAY_DATA_STORE))
-                                                        .findFirst();
-                                        if (rpLocation.isPresent()) {
-                                            
filePathList.add(rpLocation.get().getFilePath());
-                                        } else {
-                                            log.error("Could not find a 
replica for the URI " + pi.getValue());
-                                        }
-                                    } else {
-                                        // uri is in file path format
-                                        filePathList.add(uri);
-                                    }
-                                }
-                                pi.setValue(StringUtils.join(filePathList, 
','));
-                            } catch (RegistryServiceException e) {
-                                throw new RuntimeException("Error while 
launching experiment", e);
-                            } catch (TException e) {
-                                throw new RuntimeException("Error while 
launching experiment", e);
-                            }
-                        }
-                    });
-
-                    if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
-                        String taskDag = 
orchestrator.createAndSaveTasks(gatewayId, processModel);
-                        processModel.setTaskDag(taskDag);
-                    }
-                    registryClient.updateProcess(processModel, 
processModel.getProcessId());
-                }
-
-                if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()
-                        && !validateProcess(experimentId, processes)) {
-                    throw new Exception("Validating process fails for given 
experiment Id : " + experimentId);
-                }
-
-                ProcessScheduler scheduler = new ProcessSchedulerImpl();
-                if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()
-                        || scheduler.canLaunch(experimentId)) {
-                    createAndValidateTasks(experiment, registryClient, false);
-                    runExperimentLauncher(experimentId, gatewayId, token);
-                } else {
-                    log.debug(experimentId, "Queuing single application 
experiment {}.", experimentId);
-                    ExperimentStatus status = new 
ExperimentStatus(ExperimentState.SCHEDULED);
-                    status.setReason("Compute resources are not ready");
-                    status.setTimeOfStateChange(
-                            AiravataUtils.getCurrentTimestamp().getTime());
-                    
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
-                    log.info("expId: {}, Scheduled experiment ", experimentId);
-                }
-            } else if (executionType == ExperimentType.WORKFLOW) {
-                // its a workflow execution experiment
-                log.debug(experimentId, "Launching workflow experiment {}.", 
experimentId);
-                launchWorkflowExperiment(experimentId, token, gatewayId);
-            } else {
-                log.error(
-                        experimentId,
-                        "Couldn't identify experiment type, experiment {} is 
neither single application nor workflow.",
-                        experimentId);
-                throw new TException("Experiment '" + experimentId
-                        + "' launch failed. Unable to figureout execution type 
for application "
-                        + experiment.getExecutionId());
-            }
-        } catch (LaunchValidationException launchValidationException) {
-            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
-            status.setReason("Validation failed: " + 
launchValidationException.getErrorMessage());
-            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
-            throw new TException(
-                    "Experiment '" + experimentId + "' launch failed. 
Experiment failed to validate: "
-                            + launchValidationException.getErrorMessage(),
-                    launchValidationException);
+            return orchestratorService.launchExperimentWithErrorHandling(
+                    experimentId, gatewayId, 
OrchestratorServerThreadPoolExecutor.getCachedThreadPool());
+        } catch (TException e) {
+            throw e;
         } catch (Exception e) {
-            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
-            status.setReason("Unexpected error occurred: " + e.getMessage());
-            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
             throw new TException("Experiment '" + experimentId + "' launch 
failed.", e);
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
         }
-        return true;
     }
 
     /**
@@ -323,50 +160,39 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
      * @throws TException
      */
     public boolean validateExperiment(String experimentId) throws TException, 
LaunchValidationException {
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            ExperimentModel experimentModel = 
registryClient.getExperiment(experimentId);
-            return 
orchestrator.validateExperiment(experimentModel).isValidationState();
+            return orchestratorService.validateExperiment(experimentId);
         } catch (OrchestratorException e) {
             log.error(experimentId, "Error while validating experiment", e);
             throw new TException(e);
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            log.error(experimentId, "Error while retrieving experiment for 
validation", e);
+            throw new TException(e);
         }
     }
 
     @Override
     public boolean validateProcess(String experimentId, List<ProcessModel> 
processes)
             throws LaunchValidationException, TException {
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            ExperimentModel experimentModel = 
registryClient.getExperiment(experimentId);
-            for (ProcessModel processModel : processes) {
-                boolean state = orchestrator
-                        .validateProcess(experimentModel, processModel)
-                        .isSetValidationState();
-                if (!state) {
-                    return false;
-                }
-            }
-            return true;
+            return orchestratorService.validateProcess(experimentId, 
processes);
         } catch (LaunchValidationException lve) {
-
             // If a process failed to validate, also add an error message at 
the experiment level
             ErrorModel details = new ErrorModel();
             details.setActualErrorMessage(lve.getErrorMessage());
             details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-            registryClient.addErrors(OrchestratorConstants.EXPERIMENT_ERROR, 
details, experimentId);
+            try {
+                orchestratorService.addProcessValidationErrors(experimentId, 
details);
+            } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+                log.error(experimentId, "Error while adding errors to 
experiment", e);
+            }
             throw lve;
         } catch (OrchestratorException e) {
             log.error(experimentId, "Error while validating process", e);
             throw new TException(e);
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            log.error(experimentId, "Error while retrieving experiment for 
process validation", e);
+            throw new TException(e);
         }
     }
 
@@ -379,112 +205,24 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
      * @throws TException
      */
     public boolean terminateExperiment(String experimentId, String gatewayId) 
throws TException {
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         log.info(experimentId, "Experiment: {} is cancelling  !!!!!", 
experimentId);
         try {
-            return validateStatesAndCancel(registryClient, experimentId, 
gatewayId);
+            return orchestratorService.terminateExperiment(experimentId, 
gatewayId);
         } catch (Exception e) {
             log.error("expId : " + experimentId + " :- Error while cancelling 
experiment", e);
             return false;
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
         }
     }
 
     public void fetchIntermediateOutputs(String experimentId, String 
gatewayId, List<String> outputNames)
             throws TException {
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            submitIntermediateOutputsProcess(registryClient, experimentId, 
gatewayId, outputNames);
+            orchestratorService.fetchIntermediateOutputs(experimentId, 
gatewayId, outputNames);
         } catch (Exception e) {
             log.error("expId : " + experimentId + " :- Error while fetching 
intermediate", e);
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
         }
     }
 
-    private void submitIntermediateOutputsProcess(
-            Client registryClient, String experimentId, String gatewayId, 
List<String> outputNames) throws Exception {
-
-        ExperimentModel experimentModel = 
registryClient.getExperiment(experimentId);
-        ProcessModel processModel = 
ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
-        processModel.setExperimentDataDir(processModel.getExperimentDataDir() 
+ "/intermediates");
-
-        List<OutputDataObjectType> applicationOutputs = 
registryClient.getApplicationOutputs(
-                experimentModel.getExecutionId()); // This is to get a clean 
output object set
-        List<OutputDataObjectType> requestedOutputs = new ArrayList<>();
-
-        for (OutputDataObjectType output : applicationOutputs) {
-            if (outputNames.contains(output.getName())) {
-                requestedOutputs.add(output);
-            }
-        }
-        processModel.setProcessOutputs(requestedOutputs);
-        String processId = registryClient.addProcess(processModel, 
experimentId);
-        processModel.setProcessId(processId);
-
-        try {
-            // Find the process that is responsible for main experiment 
workflow by
-            // looking for the process that has the JOB_SUBMISSION task
-            Optional<ProcessModel> jobSubmissionProcess = 
experimentModel.getProcesses().stream()
-                    .filter(p -> p.getTasks().stream().anyMatch(t -> 
t.getTaskType() == TaskTypes.JOB_SUBMISSION))
-                    .findFirst();
-            if (!jobSubmissionProcess.isPresent()) {
-                throw new Exception(MessageFormat.format(
-                        "Could not find job submission process for experiment 
{0}, unable to fetch intermediate outputs {1}",
-                        experimentId, outputNames));
-            }
-            String taskDag = 
orchestrator.createAndSaveIntermediateOutputFetchingTasks(
-                    gatewayId, processModel, jobSubmissionProcess.get());
-            processModel.setTaskDag(taskDag);
-
-            registryClient.updateProcess(processModel, 
processModel.getProcessId());
-
-            // Figure out the credential token
-            UserConfigurationDataModel userConfigurationData = 
experimentModel.getUserConfigurationData();
-            String token = null;
-            final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
-            if (groupResourceProfileId == null) {
-                throw new Exception("Experiment not configured with a Group 
Resource Profile: " + experimentId);
-            }
-            GroupComputeResourcePreference groupComputeResourcePreference =
-                    registryClient.getGroupComputeResourcePreference(
-                            userConfigurationData
-                                    .getComputationalResourceScheduling()
-                                    .getResourceHostId(),
-                            groupResourceProfileId);
-            if 
(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != 
null) {
-                token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
-            }
-            if (token == null || token.isEmpty()) {
-                // try with group resource profile level token
-                GroupResourceProfile groupResourceProfile =
-                        
registryClient.getGroupResourceProfile(groupResourceProfileId);
-                token = groupResourceProfile.getDefaultCredentialStoreToken();
-            }
-            // still the token is empty, then we fail the experiment
-            if (token == null || token.isEmpty()) {
-                throw new Exception(
-                        "You have not configured credential store token at 
group resource profile or compute resource preference."
-                                + " Please provide the correct token at group 
resource profile or compute resource preference.");
-            }
-            orchestrator.launchProcess(processModel, token);
-        } catch (Exception e) {
-            log.error("Failed to launch process for intermediate output 
fetching", e);
-
-            // Update Process status to FAILED
-            ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
-            status.setReason("Intermediate output fetching process failed to 
launch: " + e.getMessage());
-            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            registryClient.addProcessStatus(status, processId);
-
-            throw e;
-        }
-    }
 
     private String getAiravataUserName() {
         return airavataUserName;
@@ -504,234 +242,18 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 
     @Override
     public boolean launchProcess(String processId, String 
airavataCredStoreToken, String gatewayId) throws TException {
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            ProcessStatus processStatus = 
registryClient.getProcessStatus(processId);
-
-            switch (processStatus.getState()) {
-                case CREATED:
-                case VALIDATED:
-                case DEQUEUING:
-                    ProcessModel processModel = 
registryClient.getProcess(processId);
-                    String applicationId = 
processModel.getApplicationInterfaceId();
-                    if (applicationId == null) {
-                        log.error(processId, "Application interface id 
shouldn't be null.");
-                        throw new OrchestratorException(
-                                "Error executing the job, application 
interface id shouldn't be null.");
-                    }
-                    // set application deployment id to process model
-                    ApplicationDeploymentDescription 
applicationDeploymentDescription =
-                            getAppDeployment(registryClient, processModel, 
applicationId);
-                    if (applicationDeploymentDescription == null) {
-                        log.error("Could not find an application deployment 
for " + processModel.getComputeResourceId()
-                                + " and application " + applicationId);
-                        throw new OrchestratorException("Could not find an 
application deployment for "
-                                + processModel.getComputeResourceId() + " and 
application " + applicationId);
-                    }
-                    
processModel.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId());
-                    // set compute resource id to process model, default we 
set the same in the user preferred compute
-                    // host id
-                    processModel.setComputeResourceId(
-                            
processModel.getProcessResourceSchedule().getResourceHostId());
-                    registryClient.updateProcess(processModel, 
processModel.getProcessId());
-                    return orchestrator.launchProcess(processModel, 
airavataCredStoreToken);
-
-                default:
-                    log.warn("Process " + processId + " is already launched. 
So it can not be relaunched");
-                    return false;
-            }
-
+            return orchestratorService.launchProcess(processId, 
airavataCredStoreToken, gatewayId);
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            log.error(processId, "Error while launching process ", e);
+            throw new TException(e);
         } catch (Exception e) {
             log.error(processId, "Error while launching process ", e);
             throw new TException(e);
-        } finally {
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
         }
     }
 
-    private ApplicationDeploymentDescription getAppDeployment(
-            RegistryService.Client registryClient, ProcessModel processModel, 
String applicationId)
-            throws OrchestratorException, ClassNotFoundException, 
ApplicationSettingsException, InstantiationException,
-                    IllegalAccessException, TException {
-        String selectedModuleId = getModuleId(registryClient, applicationId);
-        return getAppDeploymentForModule(registryClient, processModel, 
selectedModuleId);
-    }
-
-    private ApplicationDeploymentDescription getAppDeploymentForModule(
-            RegistryService.Client registryClient, ProcessModel processModel, 
String selectedModuleId)
-            throws ClassNotFoundException, ApplicationSettingsException, 
InstantiationException, IllegalAccessException,
-                    TException {
 
-        List<ApplicationDeploymentDescription> applicationDeployements =
-                registryClient.getApplicationDeployments(selectedModuleId);
-        Map<ComputeResourceDescription, ApplicationDeploymentDescription> 
deploymentMap =
-                new HashMap<ComputeResourceDescription, 
ApplicationDeploymentDescription>();
-
-        for (ApplicationDeploymentDescription deploymentDescription : 
applicationDeployements) {
-            if 
(processModel.getComputeResourceId().equals(deploymentDescription.getComputeHostId()))
 {
-                deploymentMap.put(
-                        
registryClient.getComputeResource(deploymentDescription.getComputeHostId()),
-                        deploymentDescription);
-            }
-        }
-        List<ComputeResourceDescription> computeHostList =
-                Arrays.asList(deploymentMap.keySet().toArray(new 
ComputeResourceDescription[] {}));
-        Class<? extends HostScheduler> aClass =
-                
Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
-        HostScheduler hostScheduler = aClass.newInstance();
-        ComputeResourceDescription ComputeResourceDescription = 
hostScheduler.schedule(computeHostList);
-        return deploymentMap.get(ComputeResourceDescription);
-    }
-
-    private String getModuleId(RegistryService.Client registryClient, String 
applicationId)
-            throws OrchestratorException, TException {
-        ApplicationInterfaceDescription applicationInterface = 
registryClient.getApplicationInterface(applicationId);
-        List<String> applicationModules = 
applicationInterface.getApplicationModules();
-        if (applicationModules.size() == 0) {
-            throw new OrchestratorException("No modules defined for 
application " + applicationId);
-        }
-        //                     AiravataAPI airavataAPI = getAiravataAPI();
-        String selectedModuleId = applicationModules.get(0);
-        return selectedModuleId;
-    }
-
-    private boolean validateStatesAndCancel(
-            RegistryService.Client registryClient, String experimentId, String 
gatewayId) throws Exception {
-        ExperimentStatus experimentStatus = 
registryClient.getExperimentStatus(experimentId);
-        switch (experimentStatus.getState()) {
-            case COMPLETED:
-            case CANCELED:
-            case FAILED:
-            case CANCELING:
-                log.warn(
-                        "Can't terminate already {} experiment",
-                        experimentStatus.getState().name());
-                return false;
-            case CREATED:
-                log.warn("Experiment termination is only allowed for launched 
experiments.");
-                return false;
-            default:
-                ExperimentModel experimentModel = 
registryClient.getExperiment(experimentId);
-                final UserConfigurationDataModel userConfigurationData = 
experimentModel.getUserConfigurationData();
-                final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
-
-                GroupComputeResourcePreference groupComputeResourcePreference =
-                        registryClient.getGroupComputeResourcePreference(
-                                userConfigurationData
-                                        .getComputationalResourceScheduling()
-                                        .getResourceHostId(),
-                                groupResourceProfileId);
-                String token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
-                if (token == null || token.isEmpty()) {
-                    // try with group resource profile level token
-                    GroupResourceProfile groupResourceProfile =
-                            
registryClient.getGroupResourceProfile(groupResourceProfileId);
-                    token = 
groupResourceProfile.getDefaultCredentialStoreToken();
-                }
-                // still the token is empty, then we fail the experiment
-                if (token == null || token.isEmpty()) {
-                    log.error(
-                            "You have not configured credential store token at 
group resource profile or compute resource preference."
-                                    + " Please provide the correct token at 
group resource profile or compute resource preference.");
-                    return false;
-                }
-
-                orchestrator.cancelExperiment(experimentModel, token);
-                // TODO deprecate this approach as we are replacing gfac
-                String expCancelNodePath = ZKPaths.makePath(
-                        
ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId),
-                        ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-                Stat stat = 
curatorClient.checkExists().forPath(expCancelNodePath);
-                if (stat != null) {
-                    curatorClient
-                            .setData()
-                            .withVersion(-1)
-                            .forPath(expCancelNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST.getBytes());
-                    ExperimentStatus status = new 
ExperimentStatus(ExperimentState.CANCELING);
-                    status.setReason("Experiment cancel request processed");
-                    status.setTimeOfStateChange(
-                            AiravataUtils.getCurrentTimestamp().getTime());
-                    
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
-                    log.info("expId : " + experimentId + " :- Experiment 
status updated to " + status.getState());
-                }
-                return true;
-        }
-    }
-
-    private void launchWorkflowExperiment(String experimentId, String 
airavataCredStoreToken, String gatewayId)
-            throws TException {
-        // FIXME
-        //        try {
-        //            WorkflowEnactmentService.getInstance().
-        //                    submitWorkflow(experimentId, 
airavataCredStoreToken, getGatewayName(),
-        // getRabbitMQProcessPublisher());
-        //        } catch (Exception e) {
-        //            log.error("Error while launching workflow", e);
-        //        }
-    }
-
-    private class SingleAppExperimentRunner implements Runnable {
-
-        String experimentId;
-        String airavataCredStoreToken;
-        String gatewayId;
-
-        public SingleAppExperimentRunner(String experimentId, String 
airavataCredStoreToken, String gatewayId) {
-            this.experimentId = experimentId;
-            this.airavataCredStoreToken = airavataCredStoreToken;
-            this.gatewayId = gatewayId;
-        }
-
-        @Override
-        public void run() {
-            try {
-                launchSingleAppExperiment();
-            } catch (TException e) {
-                log.error("Unable to launch experiment..", e);
-                throw new RuntimeException("Error while launching experiment", 
e);
-            } catch (AiravataException e) {
-                log.error("Unable to publish experiment status..", e);
-            }
-        }
-
-        private boolean launchSingleAppExperiment() throws TException, 
AiravataException {
-            final RegistryService.Client registryClient = 
getRegistryServiceClient();
-            try {
-                List<String> processIds = 
registryClient.getProcessIds(experimentId);
-                for (String processId : processIds) {
-                    launchProcess(processId, airavataCredStoreToken, 
gatewayId);
-                }
-                //                             ExperimentStatus status = new 
ExperimentStatus(ExperimentState.LAUNCHED);
-                //                             status.setReason("submitted all 
processes");
-                //                             
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                //                             
OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status);
-                //                             log.info("expId: {}, Launched 
experiment ", experimentId);
-            } catch (Exception e) {
-                ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
-                status.setReason("Error while updating task status");
-                
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
-                log.error(
-                        "expId: " + experimentId
-                                + ", Error while updating task status, hence 
updated experiment status to "
-                                + ExperimentState.FAILED,
-                        e);
-                ExperimentStatusChangeEvent event =
-                        new 
ExperimentStatusChangeEvent(ExperimentState.FAILED, experimentId, gatewayId);
-                String messageId = AiravataUtils.getId("EXPERIMENT");
-                MessageContext messageContext = new MessageContext(event, 
MessageType.EXPERIMENT, messageId, gatewayId);
-                
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-                publisher.publish(messageContext);
-                throw new TException(e);
-            } finally {
-                if (registryClient != null) {
-                    ThriftUtils.close(registryClient);
-                }
-            }
-            return true;
-        }
-    }
 
     private class ProcessStatusHandler implements MessageHandler {
         /**
@@ -747,154 +269,21 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                     TBase event = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(event);
                     ThriftUtils.createThriftFromBytes(bytes, 
processStatusChangeEvent);
-                    ExperimentStatus status = new ExperimentStatus();
                     ProcessIdentifier processIdentity = 
processStatusChangeEvent.getProcessIdentity();
                     log.info(
                             "expId: {}, processId: {} :- Process status 
changed event received for status {}",
                             processIdentity.getExperimentId(),
                             processIdentity.getProcessId(),
                             processStatusChangeEvent.getState().name());
-                    try {
-                        ProcessModel process = 
OrchestratorUtils.getProcess(processIdentity.getProcessId());
-                        boolean isIntermediateOutputFetchingProcess =
-                                process.getTasks().stream().anyMatch(t -> 
t.getTaskType() == TaskTypes.OUTPUT_FETCHING);
-                        if (isIntermediateOutputFetchingProcess) {
-                            log.info(
-                                    "Not updating experiment status because 
process is an intermediate output fetching one");
-                            return;
-                        }
-                    } catch (ApplicationSettingsException e) {
-                        throw new RuntimeException("Error getting process " + 
processIdentity.getProcessId(), e);
-                    }
-                    switch (processStatusChangeEvent.getState()) {
-                        //                                             case 
CREATED:
-                        //                                             case 
VALIDATED:
-                        case STARTED:
-                            try {
-                                ExperimentStatus stat =
-                                        
OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId());
-                                if (stat.getState() == 
ExperimentState.CANCELING) {
-                                    status.setState(ExperimentState.CANCELING);
-                                    status.setReason("Process started but 
experiment cancelling is triggered");
-                                } else {
-                                    status.setState(ExperimentState.EXECUTING);
-                                    status.setReason("process  started");
-                                }
-                            } catch (ApplicationSettingsException e) {
-                                throw new RuntimeException("Error ", e);
-                            }
-                            break;
-                        //                                             case 
PRE_PROCESSING:
-                        //                                                     
break;
-                        //                                             case 
CONFIGURING_WORKSPACE:
-                        //                                             case 
INPUT_DATA_STAGING:
-                        //                                             case 
EXECUTING:
-                        //                                             case 
MONITORING:
-                        //                                             case 
OUTPUT_DATA_STAGING:
-                        //                                             case 
POST_PROCESSING:
-                        //                                             case 
CANCELLING:
-                        //                                                     
break;
-                        case COMPLETED:
-                            try {
-                                ExperimentStatus stat =
-                                        
OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId());
-                                if (stat.getState() == 
ExperimentState.CANCELING) {
-                                    status.setState(ExperimentState.CANCELED);
-                                    status.setReason("Process competed but 
experiment cancelling is triggered");
-                                } else {
-                                    status.setState(ExperimentState.COMPLETED);
-                                    status.setReason("process  completed");
-                                }
-                            } catch (ApplicationSettingsException e) {
-                                throw new RuntimeException("Error ", e);
-                            }
-                            break;
-                        case FAILED:
-                            try {
-                                ExperimentStatus stat =
-                                        
OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId());
-                                if (stat.getState() == 
ExperimentState.CANCELING) {
-                                    status.setState(ExperimentState.CANCELED);
-                                    status.setReason("Process failed but 
experiment cancelling is triggered");
-                                } else {
-                                    status.setState(ExperimentState.FAILED);
-                                    status.setReason("process  failed");
-                                }
-                            } catch (ApplicationSettingsException e) {
-                                throw new RuntimeException("Unable to create 
registry client...", e);
-                            }
-                            break;
-                        case CANCELED:
-                            // TODO if experiment have more than one process 
associated with it, then this should be
-                            // changed.
-                            status.setState(ExperimentState.CANCELED);
-                            status.setReason("process  cancelled");
-                            break;
-                        case QUEUED:
-                            status.setState(ExperimentState.SCHEDULED);
-                            status.setReason("Process started but compute 
resource not avaialable");
-                            break;
-                        case REQUEUED:
-                            status.setState(ExperimentState.SCHEDULED);
-                            status.setReason("Job submission failed,  requeued 
to resubmit");
-                            List<QueueStatusModel> queueStatusModels = new 
ArrayList<>();
-                            final RegistryService.Client registryClient = 
getRegistryServiceClient();
-                            ExperimentModel experimentModel =
-                                    
registryClient.getExperiment(processIdentity.getExperimentId());
-                            UserConfigurationDataModel 
userConfigurationDataModel =
-                                    experimentModel.getUserConfigurationData();
-                            if (userConfigurationDataModel != null) {
-                                ComputationalResourceSchedulingModel 
computationalResourceSchedulingModel =
-                                        
userConfigurationDataModel.getComputationalResourceScheduling();
-                                if (computationalResourceSchedulingModel != 
null) {
-                                    String queueName = 
computationalResourceSchedulingModel.getQueueName();
-                                    String resourceId = 
computationalResourceSchedulingModel.getResourceHostId();
-                                    ComputeResourceDescription comResourceDes =
-                                            
registryClient.getComputeResource(resourceId);
-                                    QueueStatusModel queueStatusModel = new 
QueueStatusModel();
-                                    
queueStatusModel.setHostName(comResourceDes.getHostName());
-                                    queueStatusModel.setQueueName(queueName);
-                                    queueStatusModel.setQueueUp(false);
-                                    queueStatusModel.setRunningJobs(0);
-                                    queueStatusModel.setQueuedJobs(0);
-                                    
queueStatusModel.setTime(System.currentTimeMillis());
-                                    queueStatusModels.add(queueStatusModel);
-                                    
registryClient.registerQueueStatuses(queueStatusModels);
-                                }
-                            }
-
-                            break;
-                        case DEQUEUING:
-                            try {
-                                ExperimentStatus stat =
-                                        
OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId());
-                                if (stat.getState() == 
ExperimentState.CANCELING) {
-                                    status.setState(ExperimentState.CANCELING);
-                                    status.setReason("Process started but 
experiment cancelling is triggered");
-                                } else {
-                                    
launchQueuedExperiment(processIdentity.getExperimentId());
-                                }
-
-                            } catch (Exception e) {
-                                throw new RuntimeException("Error ", e);
-                            }
-                            break;
-                        default:
-                            // ignore other status changes, thoes will not 
affect for experiment status changes
-                            return;
-                    }
-                    if (status.getState() != null) {
-                        status.setTimeOfStateChange(
-                                AiravataUtils.getCurrentTimestamp().getTime());
-                        OrchestratorUtils.updateAndPublishExperimentStatus(
-                                processIdentity.getExperimentId(), status, 
publisher, processIdentity.getGatewayId());
-                        log.info("expId : " + 
processIdentity.getExperimentId() + " :- Experiment status updated to "
-                                + status.getState());
-                    }
+                    
orchestratorService.handleProcessStatusChange(processStatusChangeEvent, 
processIdentity);
                 } catch (TException e) {
                     log.error("Message Id : " + message.getMessageId() + ", 
Message type : " + message.getType()
                             + "Error" + " while prcessing process status 
change event");
                     throw new RuntimeException("Error while updating 
experiment status", e);
+                } catch (Exception e) {
+                    log.error("Message Id : " + message.getMessageId() + ", 
Message type : " + message.getType()
+                            + "Error" + " while prcessing process status 
change event", e);
+                    throw new RuntimeException("Error while updating 
experiment status", e);
                 }
             } else {
                 System.out.println("Message Recieved with message id " + 
message.getMessageId() + " and with message "
@@ -935,10 +324,13 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                         "Cancelling experiment with experimentId: {} gateway 
Id: {}",
                         expEvent.getExperimentId(),
                         expEvent.getGatewayId());
-                terminateExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
+                orchestratorService.handleCancelExperiment(expEvent);
             } catch (TException e) {
                 log.error("Error while cancelling experiment", e);
                 throw new RuntimeException("Error while cancelling 
experiment", e);
+            } catch (Exception e) {
+                log.error("Error while cancelling experiment", e);
+                throw new RuntimeException("Error while cancelling 
experiment", e);
             } finally {
                 experimentSubscriber.sendAck(messageContext.getDeliveryTag());
             }
@@ -954,10 +346,13 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                         event.getExperimentId(),
                         event.getGatewayId(),
                         event.getOutputNames());
-                fetchIntermediateOutputs(event.getExperimentId(), 
event.getGatewayId(), event.getOutputNames());
+                orchestratorService.handleIntermediateOutputsEvent(event);
             } catch (TException e) {
                 log.error("Error while fetching intermediate outputs", e);
                 throw new RuntimeException("Error while fetching intermediate 
outputs", e);
+            } catch (Exception e) {
+                log.error("Error while fetching intermediate outputs", e);
+                throw new RuntimeException("Error while fetching intermediate 
outputs", e);
             } finally {
                 experimentSubscriber.sendAck(messageContext.getDeliveryTag());
             }
@@ -965,57 +360,20 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
     }
 
     private void launchExperiment(MessageContext messageContext) {
-        ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
         try {
-            byte[] bytes = 
ThriftUtils.serializeThriftObject(messageContext.getEvent());
-            ThriftUtils.createThriftFromBytes(bytes, expEvent);
-            MDC.put(MDCConstants.EXPERIMENT_ID, expEvent.getExperimentId());
-            log.info(
-                    "Launching experiment with experimentId: {} gateway Id: 
{}",
-                    expEvent.getExperimentId(),
-                    expEvent.getGatewayId());
-            if (messageContext.isRedeliver()) {
-                ExperimentModel experimentModel = 
registryClient.getExperiment(expEvent.getExperimentId());
-                MDC.put(MDCConstants.EXPERIMENT_NAME, 
experimentModel.getExperimentName());
-                if (experimentModel.getExperimentStatus().get(0).getState() == 
ExperimentState.CREATED) {
-                    launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
-                }
-            } else {
-                launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
-            }
+            
orchestratorService.handleLaunchExperimentFromMessage(messageContext);
         } catch (TException e) {
-            String logMessage = expEvent.getExperimentId() != null && 
expEvent.getGatewayId() != null
-                    ? String.format(
-                            "Experiment launch failed due to Thrift conversion 
error, experimentId: %s, gatewayId: %s",
-                            expEvent.getExperimentId(), 
expEvent.getGatewayId())
-                    : "Experiment launch failed due to Thrift conversion 
error";
-            log.error(logMessage, e);
+            log.error("Experiment launch failed due to Thrift conversion 
error", e);
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            log.error("Experiment launch failed due to registry error", e);
         } catch (Exception e) {
-            log.error(
-                    "An unknown issue while launching experiment "
-                            + 
Optional.ofNullable(expEvent.getExperimentId()).orElse("missing experiment")
-                            + " on gateway "
-                            + 
Optional.ofNullable(expEvent.getGatewayId()).orElse("missing gateway"),
-                    e);
+            log.error("An unknown issue while launching experiment", e);
         } finally {
             experimentSubscriber.sendAck(messageContext.getDeliveryTag());
             MDC.clear();
-            if (registryClient != null) {
-                ThriftUtils.close(registryClient);
-            }
         }
     }
 
-    private RegistryService.Client getRegistryServiceClient() {
-        try {
-            final int serverPort = 
Integer.parseInt(ServerSettings.getRegistryServerPort());
-            final String serverHost = ServerSettings.getRegistryServerHost();
-            return 
RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
-        } catch (RegistryServiceException | ApplicationSettingsException e) {
-            throw new RuntimeException("Unable to create registry client...", 
e);
-        }
-    }
 
     private void startCurator() throws ApplicationSettingsException {
         String connectionSting = ServerSettings.getZookeeperConnection();
@@ -1028,72 +386,4 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, 
experimentId);
     }
 
-    private void runExperimentLauncher(String experimentId, String gatewayId, 
String token) throws TException {
-        ExperimentStatus status = new 
ExperimentStatus(ExperimentState.LAUNCHED);
-        status.setReason("submitted all processes");
-        
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
-        log.info("expId: {}, Launched experiment ", experimentId);
-        OrchestratorServerThreadPoolExecutor.getCachedThreadPool()
-                .execute(MDCUtil.wrapWithMDC(new 
SingleAppExperimentRunner(experimentId, token, gatewayId)));
-    }
-
-    private void launchQueuedExperiment(String experimentId) throws 
TException, Exception {
-        ExperimentModel experiment = null;
-        final RegistryService.Client registryClient = 
getRegistryServiceClient();
-        // TODO deprecate this approach as we are replacing gfac
-        experiment = registryClient.getExperiment(experimentId);
-        if (experiment == null) {
-            throw new Exception("Error retrieving the Experiment by the given 
experimentID: " + experimentId);
-        }
-
-        UserConfigurationDataModel userConfigurationData = 
experiment.getUserConfigurationData();
-        String token = null;
-        final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
-        if (groupResourceProfileId == null) {
-            throw new Exception("Experiment not configured with a Group 
Resource Profile: " + experimentId);
-        }
-        GroupComputeResourcePreference groupComputeResourcePreference =
-                registryClient.getGroupComputeResourcePreference(
-                        userConfigurationData
-                                .getComputationalResourceScheduling()
-                                .getResourceHostId(),
-                        groupResourceProfileId);
-        if 
(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != 
null) {
-            token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
-        }
-        if (token == null || token.isEmpty()) {
-            // try with group resource profile level token
-            GroupResourceProfile groupResourceProfile = 
registryClient.getGroupResourceProfile(groupResourceProfileId);
-            token = groupResourceProfile.getDefaultCredentialStoreToken();
-        }
-        // still the token is empty, then we fail the experiment
-        if (token == null || token.isEmpty()) {
-            throw new Exception(
-                    "You have not configured credential store token at group 
resource profile or compute resource preference."
-                            + " Please provide the correct token at group 
resource profile or compute resource preference.");
-        }
-        createAndValidateTasks(experiment, registryClient, true);
-        runExperimentLauncher(experimentId, experiment.getGatewayId(), token);
-    }
-
-    private void createAndValidateTasks(
-            ExperimentModel experiment, RegistryService.Client registryClient, 
boolean recreateTaskDag)
-            throws Exception {
-        if (experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
-            List<ProcessModel> processModels = 
registryClient.getProcessList(experiment.getExperimentId());
-            for (ProcessModel processModel : processModels) {
-                if (processModel.getTaskDag() == null || recreateTaskDag) {
-                    registryClient.deleteTasks(processModel.getProcessId());
-                    String taskDag = 
orchestrator.createAndSaveTasks(experiment.getGatewayId(), processModel);
-                    processModel.setTaskDag(taskDag);
-                    registryClient.updateProcess(processModel, 
processModel.getProcessId());
-                }
-            }
-            if (!validateProcess(experiment.getExperimentId(), processModels)) 
{
-                throw new Exception(
-                        "Validating process fails for given experiment Id : " 
+ experiment.getExperimentId());
-            }
-        }
-    }
 }
diff --git 
a/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java
 
b/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java
new file mode 100644
index 0000000000..d9e073102b
--- /dev/null
+++ 
b/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java
@@ -0,0 +1,737 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.airavata.service;
+
+import java.text.MessageFormat;
+import java.util.*;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logging.MDCUtil;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.common.utils.ZkConstants;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.metascheduler.core.api.ProcessScheduler;
+import 
org.apache.airavata.metascheduler.process.scheduling.api.ProcessSchedulerImpl;
+import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import 
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import 
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.DataProductModel;
+import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
+import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
+import org.apache.airavata.model.error.LaunchValidationException;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.experiment.ExperimentType;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.model.process.ProcessModel;
+import 
org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.QueueStatusModel;
+import org.apache.airavata.model.task.TaskTypes;
+import 
org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.schedule.HostScheduler;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
+import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
+import org.apache.airavata.orchestrator.util.OrchestratorUtils;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OrchestratorService {
+    private static final Logger logger = 
LoggerFactory.getLogger(OrchestratorService.class);
+    
+    private OrchestratorRegistryService orchestratorRegistryService;
+    private SimpleOrchestratorImpl orchestrator;
+    private CuratorFramework curatorClient;
+    private Publisher publisher;
+
+    public OrchestratorService(
+            OrchestratorRegistryService orchestratorRegistryService,
+            SimpleOrchestratorImpl orchestrator,
+            CuratorFramework curatorClient,
+            Publisher publisher) {
+        this.orchestratorRegistryService = orchestratorRegistryService;
+        this.orchestrator = orchestrator;
+        this.curatorClient = curatorClient;
+        this.publisher = publisher;
+    }
+
+    public boolean launchExperiment(String experimentId, String gatewayId) 
throws Exception {
+        String experimentNodePath = getExperimentNodePath(experimentId);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentNodePath);
+        String experimentCancelNode =
+                ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentCancelNode);
+        ExperimentModel experiment = 
orchestratorRegistryService.getExperiment(experimentId);
+        if (experiment == null) {
+            throw new Exception("Error retrieving the Experiment by the given 
experimentID: " + experimentId);
+        }
+
+        UserConfigurationDataModel userConfigurationData = 
experiment.getUserConfigurationData();
+        String token = getCredentialToken(experiment, userConfigurationData);
+
+        ExperimentType executionType = experiment.getExperimentType();
+        if (executionType == ExperimentType.SINGLE_APPLICATION) {
+            return launchSingleAppExperiment(experiment, experimentId, 
gatewayId, token);
+        } else if (executionType == ExperimentType.WORKFLOW) {
+            logger.debug(experimentId, "Launching workflow experiment {}.", 
experimentId);
+            launchWorkflowExperiment(experimentId, token, gatewayId);
+            return true;
+        } else {
+            logger.error(
+                    experimentId,
+                    "Couldn't identify experiment type, experiment {} is 
neither single application nor workflow.",
+                    experimentId);
+            throw new TException("Experiment '" + experimentId
+                    + "' launch failed. Unable to figureout execution type for 
application "
+                    + experiment.getExecutionId());
+        }
+    }
+
+    private boolean launchSingleAppExperiment(
+            ExperimentModel experiment, String experimentId, String gatewayId, 
String token) throws Exception {
+        List<ProcessModel> processes = 
orchestrator.createProcesses(experimentId, gatewayId);
+
+        for (ProcessModel processModel : processes) {
+            resolveInputReplicas(processModel);
+            
+            if 
(!experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+                String taskDag = orchestrator.createAndSaveTasks(gatewayId, 
processModel);
+                processModel.setTaskDag(taskDag);
+            }
+            orchestratorRegistryService.updateProcess(processModel, 
processModel.getProcessId());
+        }
+
+        if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()
+                && !validateProcess(experimentId, processes)) {
+            throw new Exception("Validating process fails for given experiment 
Id : " + experimentId);
+        }
+
+        ProcessScheduler scheduler = new ProcessSchedulerImpl();
+        if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()
+                || scheduler.canLaunch(experimentId)) {
+            createAndValidateTasks(experiment, false);
+            return true; // runExperimentLauncher will be called separately
+        } else {
+            logger.debug(experimentId, "Queuing single application experiment 
{}.", experimentId);
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.SCHEDULED);
+            status.setReason("Compute resources are not ready");
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            logger.info("expId: {}, Scheduled experiment ", experimentId);
+            return false;
+        }
+    }
+
+    private void resolveInputReplicas(ProcessModel processModel) throws 
Exception {
+        for (var pi : processModel.getProcessInputs()) {
+            if (pi.getType().equals(DataType.URI)
+                    && pi.getValue() != null
+                    && pi.getValue().startsWith("airavata-dp://")) {
+                try {
+                    DataProductModel dataProductModel = 
orchestratorRegistryService.getDataProduct(pi.getValue());
+                    Optional<DataReplicaLocationModel> rpLocation =
+                            dataProductModel.getReplicaLocations().stream()
+                                    .filter(rpModel -> 
rpModel.getReplicaLocationCategory()
+                                            
.equals(ReplicaLocationCategory.GATEWAY_DATA_STORE))
+                                    .findFirst();
+                    if (rpLocation.isPresent()) {
+                        pi.setValue(rpLocation.get().getFilePath());
+                        
pi.setStorageResourceId(rpLocation.get().getStorageResourceId());
+                    } else {
+                        logger.error("Could not find a replica for the URI " + 
pi.getValue());
+                    }
+                } catch (RegistryException e) {
+                    throw new Exception("Error while launching experiment", e);
+                }
+            } else if (pi.getType().equals(DataType.URI_COLLECTION)
+                    && pi.getValue() != null
+                    && pi.getValue().contains("airavata-dp://")) {
+                try {
+                    String[] uriList = pi.getValue().split(",");
+                    final ArrayList<String> filePathList = new ArrayList<>();
+                    for (String uri : uriList) {
+                        if (uri.startsWith("airavata-dp://")) {
+                            DataProductModel dataProductModel = 
orchestratorRegistryService.getDataProduct(uri);
+                            Optional<DataReplicaLocationModel> rpLocation =
+                                    
dataProductModel.getReplicaLocations().stream()
+                                            .filter(rpModel -> 
rpModel.getReplicaLocationCategory()
+                                                    
.equals(ReplicaLocationCategory.GATEWAY_DATA_STORE))
+                                            .findFirst();
+                            if (rpLocation.isPresent()) {
+                                
filePathList.add(rpLocation.get().getFilePath());
+                            } else {
+                                logger.error("Could not find a replica for the 
URI " + pi.getValue());
+                            }
+                        } else {
+                            filePathList.add(uri);
+                        }
+                    }
+                    pi.setValue(StringUtils.join(filePathList, ','));
+                } catch (RegistryException e) {
+                    throw new Exception("Error while launching experiment", e);
+                }
+            }
+        }
+    }
+
+    public String getCredentialToken(ExperimentModel experiment, 
UserConfigurationDataModel userConfigurationData) throws Exception {
+        String token = null;
+        final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
+        if (groupResourceProfileId == null) {
+            throw new Exception("Experiment not configured with a Group 
Resource Profile: " + experiment.getExperimentId());
+        }
+
+        if (userConfigurationData.getComputationalResourceScheduling() != null
+                && userConfigurationData
+                        .getComputationalResourceScheduling()
+                        
.isSet(ComputationalResourceSchedulingModel._Fields.RESOURCE_HOST_ID)) {
+            GroupComputeResourcePreference groupComputeResourcePreference =
+                    
orchestratorRegistryService.getGroupComputeResourcePreference(
+                            userConfigurationData
+                                    .getComputationalResourceScheduling()
+                                    .getResourceHostId(),
+                            groupResourceProfileId);
+
+            if 
(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != 
null) {
+                token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+            }
+        }
+        if (token == null || token.isEmpty()) {
+            GroupResourceProfile groupResourceProfile =
+                    
orchestratorRegistryService.getGroupResourceProfile(groupResourceProfileId);
+            token = groupResourceProfile.getDefaultCredentialStoreToken();
+        }
+        if (token == null || token.isEmpty()) {
+            throw new Exception(
+                    "You have not configured credential store token at group 
resource profile or compute resource preference."
+                            + " Please provide the correct token at group 
resource profile or compute resource preference.");
+        }
+        return token;
+    }
+
+    public boolean validateExperiment(String experimentId) throws TException, 
LaunchValidationException, RegistryException, OrchestratorException {
+        ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(experimentId);
+        return 
orchestrator.validateExperiment(experimentModel).isValidationState();
+    }
+
+    public boolean validateProcess(String experimentId, List<ProcessModel> 
processes)
+            throws LaunchValidationException, TException, RegistryException, 
OrchestratorException {
+        ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(experimentId);
+        for (ProcessModel processModel : processes) {
+            boolean state = orchestrator
+                    .validateProcess(experimentModel, processModel)
+                    .isSetValidationState();
+            if (!state) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public boolean terminateExperiment(String experimentId, String gatewayId) 
throws Exception {
+        logger.info(experimentId, "Experiment: {} is cancelling  !!!!!", 
experimentId);
+        return validateStatesAndCancel(experimentId, gatewayId);
+    }
+
+    private boolean validateStatesAndCancel(String experimentId, String 
gatewayId) throws Exception {
+        ExperimentStatus experimentStatus = 
orchestratorRegistryService.getExperimentStatus(experimentId);
+        switch (experimentStatus.getState()) {
+            case COMPLETED:
+            case CANCELED:
+            case FAILED:
+            case CANCELING:
+                logger.warn(
+                        "Can't terminate already {} experiment",
+                        experimentStatus.getState().name());
+                return false;
+            case CREATED:
+                logger.warn("Experiment termination is only allowed for 
launched experiments.");
+                return false;
+            default:
+                ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(experimentId);
+                final UserConfigurationDataModel userConfigurationData = 
experimentModel.getUserConfigurationData();
+                final String groupResourceProfileId = 
userConfigurationData.getGroupResourceProfileId();
+
+                GroupComputeResourcePreference groupComputeResourcePreference =
+                        
orchestratorRegistryService.getGroupComputeResourcePreference(
+                                userConfigurationData
+                                        .getComputationalResourceScheduling()
+                                        .getResourceHostId(),
+                                groupResourceProfileId);
+                String token = 
groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
+                if (token == null || token.isEmpty()) {
+                    GroupResourceProfile groupResourceProfile =
+                            
orchestratorRegistryService.getGroupResourceProfile(groupResourceProfileId);
+                    token = 
groupResourceProfile.getDefaultCredentialStoreToken();
+                }
+                if (token == null || token.isEmpty()) {
+                    logger.error(
+                            "You have not configured credential store token at 
group resource profile or compute resource preference."
+                                    + " Please provide the correct token at 
group resource profile or compute resource preference.");
+                    return false;
+                }
+
+                orchestrator.cancelExperiment(experimentModel, token);
+                String expCancelNodePath = ZKPaths.makePath(
+                        
ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId),
+                        ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+                Stat stat = 
curatorClient.checkExists().forPath(expCancelNodePath);
+                if (stat != null) {
+                    curatorClient
+                            .setData()
+                            .withVersion(-1)
+                            .forPath(expCancelNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST.getBytes());
+                    ExperimentStatus status = new 
ExperimentStatus(ExperimentState.CANCELING);
+                    status.setReason("Experiment cancel request processed");
+                    
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
+                    logger.info("expId : " + experimentId + " :- Experiment 
status updated to " + status.getState());
+                }
+                return true;
+        }
+    }
+
+    public void fetchIntermediateOutputs(String experimentId, String 
gatewayId, List<String> outputNames)
+            throws Exception {
+        submitIntermediateOutputsProcess(experimentId, gatewayId, outputNames);
+    }
+
+    private void submitIntermediateOutputsProcess(
+            String experimentId, String gatewayId, List<String> outputNames) 
throws Exception {
+
+        ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(experimentId);
+        ProcessModel processModel = 
ExperimentModelUtil.cloneProcessFromExperiment(experimentModel);
+        processModel.setExperimentDataDir(processModel.getExperimentDataDir() 
+ "/intermediates");
+
+        List<OutputDataObjectType> applicationOutputs = 
orchestratorRegistryService.getApplicationOutputs(
+                experimentModel.getExecutionId());
+        List<OutputDataObjectType> requestedOutputs = new ArrayList<>();
+
+        for (OutputDataObjectType output : applicationOutputs) {
+            if (outputNames.contains(output.getName())) {
+                requestedOutputs.add(output);
+            }
+        }
+        processModel.setProcessOutputs(requestedOutputs);
+        String processId = 
orchestratorRegistryService.addProcess(processModel, experimentId);
+        processModel.setProcessId(processId);
+
+        try {
+            Optional<ProcessModel> jobSubmissionProcess = 
experimentModel.getProcesses().stream()
+                    .filter(p -> p.getTasks().stream().anyMatch(t -> 
t.getTaskType() == TaskTypes.JOB_SUBMISSION))
+                    .findFirst();
+            if (!jobSubmissionProcess.isPresent()) {
+                throw new Exception(MessageFormat.format(
+                        "Could not find job submission process for experiment 
{0}, unable to fetch intermediate outputs {1}",
+                        experimentId, outputNames));
+            }
+            String taskDag = 
orchestrator.createAndSaveIntermediateOutputFetchingTasks(
+                    gatewayId, processModel, jobSubmissionProcess.get());
+            processModel.setTaskDag(taskDag);
+
+            orchestratorRegistryService.updateProcess(processModel, 
processModel.getProcessId());
+
+            String token = getCredentialToken(experimentModel, 
experimentModel.getUserConfigurationData());
+            orchestrator.launchProcess(processModel, token);
+        } catch (Exception e) {
+            logger.error("Failed to launch process for intermediate output 
fetching", e);
+
+            ProcessStatus status = new ProcessStatus(ProcessState.FAILED);
+            status.setReason("Intermediate output fetching process failed to 
launch: " + e.getMessage());
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            orchestratorRegistryService.addProcessStatus(status, processId);
+
+            throw e;
+        }
+    }
+
+    public boolean launchProcess(String processId, String 
airavataCredStoreToken, String gatewayId) throws Exception {
+        ProcessStatus processStatus = 
orchestratorRegistryService.getProcessStatus(processId);
+
+        switch (processStatus.getState()) {
+            case CREATED:
+            case VALIDATED:
+            case DEQUEUING:
+                ProcessModel processModel = 
orchestratorRegistryService.getProcess(processId);
+                String applicationId = 
processModel.getApplicationInterfaceId();
+                if (applicationId == null) {
+                    logger.error(processId, "Application interface id 
shouldn't be null.");
+                    throw new OrchestratorException(
+                            "Error executing the job, application interface id 
shouldn't be null.");
+                }
+                ApplicationDeploymentDescription 
applicationDeploymentDescription =
+                        getAppDeployment(processModel, applicationId);
+                if (applicationDeploymentDescription == null) {
+                    logger.error("Could not find an application deployment for 
" + processModel.getComputeResourceId()
+                            + " and application " + applicationId);
+                    throw new OrchestratorException("Could not find an 
application deployment for "
+                            + processModel.getComputeResourceId() + " and 
application " + applicationId);
+                }
+                
processModel.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId());
+                processModel.setComputeResourceId(
+                        
processModel.getProcessResourceSchedule().getResourceHostId());
+                orchestratorRegistryService.updateProcess(processModel, 
processModel.getProcessId());
+                return orchestrator.launchProcess(processModel, 
airavataCredStoreToken);
+
+            default:
+                logger.warn("Process " + processId + " is already launched. So 
it can not be relaunched");
+                return false;
+        }
+    }
+
+    private ApplicationDeploymentDescription getAppDeployment(
+            ProcessModel processModel, String applicationId)
+            throws Exception {
+        String selectedModuleId = getModuleId(applicationId);
+        return getAppDeploymentForModule(processModel, selectedModuleId);
+    }
+
+    private ApplicationDeploymentDescription getAppDeploymentForModule(
+            ProcessModel processModel, String selectedModuleId)
+            throws Exception {
+
+        List<ApplicationDeploymentDescription> applicationDeployements =
+                
orchestratorRegistryService.getApplicationDeployments(selectedModuleId);
+        Map<ComputeResourceDescription, ApplicationDeploymentDescription> 
deploymentMap =
+                new HashMap<>();
+
+        for (ApplicationDeploymentDescription deploymentDescription : 
applicationDeployements) {
+            if 
(processModel.getComputeResourceId().equals(deploymentDescription.getComputeHostId()))
 {
+                deploymentMap.put(
+                        
orchestratorRegistryService.getComputeResource(deploymentDescription.getComputeHostId()),
+                        deploymentDescription);
+            }
+        }
+        List<ComputeResourceDescription> computeHostList =
+                Arrays.asList(deploymentMap.keySet().toArray(new 
ComputeResourceDescription[] {}));
+        Class<? extends HostScheduler> aClass =
+                
Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+        HostScheduler hostScheduler = aClass.newInstance();
+        ComputeResourceDescription ComputeResourceDescription = 
hostScheduler.schedule(computeHostList);
+        return deploymentMap.get(ComputeResourceDescription);
+    }
+
+    private String getModuleId(String applicationId)
+            throws Exception {
+        ApplicationInterfaceDescription applicationInterface = 
orchestratorRegistryService.getApplicationInterface(applicationId);
+        List<String> applicationModules = 
applicationInterface.getApplicationModules();
+        if (applicationModules.size() == 0) {
+            throw new OrchestratorException("No modules defined for 
application " + applicationId);
+        }
+        String selectedModuleId = applicationModules.get(0);
+        return selectedModuleId;
+    }
+
+    private void launchWorkflowExperiment(String experimentId, String 
airavataCredStoreToken, String gatewayId)
+            throws TException {
+        // FIXME - Workflow support not implemented
+    }
+
+    public void createAndValidateTasks(ExperimentModel experiment, boolean 
recreateTaskDag) throws Exception {
+        if (experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+            List<ProcessModel> processModels = 
orchestratorRegistryService.getProcessList(experiment.getExperimentId());
+            for (ProcessModel processModel : processModels) {
+                if (processModel.getTaskDag() == null || recreateTaskDag) {
+                    
orchestratorRegistryService.deleteTasks(processModel.getProcessId());
+                    String taskDag = 
orchestrator.createAndSaveTasks(experiment.getGatewayId(), processModel);
+                    processModel.setTaskDag(taskDag);
+                    orchestratorRegistryService.updateProcess(processModel, 
processModel.getProcessId());
+                }
+            }
+            if (!validateProcess(experiment.getExperimentId(), processModels)) 
{
+                throw new Exception(
+                        "Validating process fails for given experiment Id : " 
+ experiment.getExperimentId());
+            }
+        }
+    }
+
+    public void addProcessValidationErrors(String experimentId, ErrorModel 
details) throws RegistryException {
+        
orchestratorRegistryService.addErrors(OrchestratorConstants.EXPERIMENT_ERROR, 
details, experimentId);
+    }
+
+    public String getExperimentNodePath(String experimentId) {
+        return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, 
experimentId);
+    }
+
+    public boolean launchSingleAppExperimentInternal(String experimentId, 
String airavataCredStoreToken, String gatewayId) throws Exception {
+        try {
+            List<String> processIds = 
orchestratorRegistryService.getProcessIds(experimentId);
+            for (String processId : processIds) {
+                launchProcess(processId, airavataCredStoreToken, gatewayId);
+            }
+            return true;
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("Error while retrieving process IDs");
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            logger.error("expId: " + experimentId + ", Error while retrieving 
process IDs", e);
+            throw new Exception("Error while retrieving process IDs", e);
+        } catch (Exception e) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("Error while launching processes");
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            logger.error("expId: " + experimentId + ", Error while launching 
processes", e);
+            throw e;
+        }
+    }
+
+    public void launchQueuedExperiment(String experimentId) throws Exception {
+        ExperimentModel experiment = 
orchestratorRegistryService.getExperiment(experimentId);
+        if (experiment == null) {
+            throw new Exception("Error retrieving the Experiment by the given 
experimentID: " + experimentId);
+        }
+
+        UserConfigurationDataModel userConfigurationData = 
experiment.getUserConfigurationData();
+        String token = getCredentialToken(experiment, userConfigurationData);
+        createAndValidateTasks(experiment, true);
+        
+        // Publish experiment launched status and run launcher
+        ExperimentStatus status = new 
ExperimentStatus(ExperimentState.LAUNCHED);
+        status.setReason("submitted all processes");
+        
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, experiment.getGatewayId());
+        logger.info("expId: {}, Launched experiment ", experimentId);
+        
+        // Launch processes
+        launchSingleAppExperimentInternal(experimentId, token, 
experiment.getGatewayId());
+    }
+
+    public void handleProcessStatusChange(
+            ProcessStatusChangeEvent processStatusChangeEvent,
+            ProcessIdentifier processIdentity) throws Exception {
+        ExperimentStatus status = new ExperimentStatus();
+        
+        // Check if this is an intermediate output fetching process
+        ProcessModel process = 
orchestratorRegistryService.getProcess(processIdentity.getProcessId());
+        boolean isIntermediateOutputFetchingProcess =
+                process.getTasks().stream().anyMatch(t -> t.getTaskType() == 
TaskTypes.OUTPUT_FETCHING);
+        if (isIntermediateOutputFetchingProcess) {
+            logger.info("Not updating experiment status because process is an 
intermediate output fetching one");
+            return;
+        }
+        
+        switch (processStatusChangeEvent.getState()) {
+            case STARTED:
+                ExperimentStatus stat = 
orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId());
+                if (stat.getState() == ExperimentState.CANCELING) {
+                    status.setState(ExperimentState.CANCELING);
+                    status.setReason("Process started but experiment 
cancelling is triggered");
+                } else {
+                    status.setState(ExperimentState.EXECUTING);
+                    status.setReason("process  started");
+                }
+                break;
+            case COMPLETED:
+                stat = 
orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId());
+                if (stat.getState() == ExperimentState.CANCELING) {
+                    status.setState(ExperimentState.CANCELED);
+                    status.setReason("Process competed but experiment 
cancelling is triggered");
+                } else {
+                    status.setState(ExperimentState.COMPLETED);
+                    status.setReason("process  completed");
+                }
+                break;
+            case FAILED:
+                stat = 
orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId());
+                if (stat.getState() == ExperimentState.CANCELING) {
+                    status.setState(ExperimentState.CANCELED);
+                    status.setReason("Process failed but experiment cancelling 
is triggered");
+                } else {
+                    status.setState(ExperimentState.FAILED);
+                    status.setReason("process  failed");
+                }
+                break;
+            case CANCELED:
+                status.setState(ExperimentState.CANCELED);
+                status.setReason("process  cancelled");
+                break;
+            case QUEUED:
+                status.setState(ExperimentState.SCHEDULED);
+                status.setReason("Process started but compute resource not 
avaialable");
+                break;
+            case REQUEUED:
+                status.setState(ExperimentState.SCHEDULED);
+                status.setReason("Job submission failed,  requeued to 
resubmit");
+                
registerQueueStatusForRequeue(processIdentity.getExperimentId());
+                break;
+            case DEQUEUING:
+                stat = 
orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId());
+                if (stat.getState() == ExperimentState.CANCELING) {
+                    status.setState(ExperimentState.CANCELING);
+                    status.setReason("Process started but experiment 
cancelling is triggered");
+                } else {
+                    launchQueuedExperiment(processIdentity.getExperimentId());
+                }
+                break;
+            default:
+                // ignore other status changes
+                return;
+        }
+        
+        if (status.getState() != null) {
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(
+                    processIdentity.getExperimentId(), status, publisher, 
processIdentity.getGatewayId());
+            logger.info("expId : " + processIdentity.getExperimentId() + " :- 
Experiment status updated to "
+                    + status.getState());
+        }
+    }
+
+    private void registerQueueStatusForRequeue(String experimentId) {
+        try {
+            List<QueueStatusModel> queueStatusModels = new ArrayList<>();
+            ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(experimentId);
+            UserConfigurationDataModel userConfigurationDataModel = 
experimentModel.getUserConfigurationData();
+            if (userConfigurationDataModel != null) {
+                ComputationalResourceSchedulingModel 
computationalResourceSchedulingModel =
+                        
userConfigurationDataModel.getComputationalResourceScheduling();
+                if (computationalResourceSchedulingModel != null) {
+                    String queueName = 
computationalResourceSchedulingModel.getQueueName();
+                    String resourceId = 
computationalResourceSchedulingModel.getResourceHostId();
+                    ComputeResourceDescription comResourceDes =
+                            
orchestratorRegistryService.getComputeResource(resourceId);
+                    QueueStatusModel queueStatusModel = new QueueStatusModel();
+                    queueStatusModel.setHostName(comResourceDes.getHostName());
+                    queueStatusModel.setQueueName(queueName);
+                    queueStatusModel.setQueueUp(false);
+                    queueStatusModel.setRunningJobs(0);
+                    queueStatusModel.setQueuedJobs(0);
+                    queueStatusModel.setTime(System.currentTimeMillis());
+                    queueStatusModels.add(queueStatusModel);
+                    
orchestratorRegistryService.registerQueueStatuses(queueStatusModels);
+                }
+            }
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            logger.error("Error while registering queue statuses", e);
+        } catch (org.apache.airavata.registry.cpi.AppCatalogException e) {
+            logger.error("Error while getting compute resource for queue 
status", e);
+        }
+    }
+
+    public void handleLaunchExperiment(ExperimentSubmitEvent expEvent) throws 
Exception {
+        ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(expEvent.getExperimentId());
+        if (experimentModel.getExperimentStatus().get(0).getState() == 
ExperimentState.CREATED) {
+            launchExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
+        }
+    }
+
+    /**
+     * Handle launch experiment from message context with deserialization and 
redelivery checks
+     */
+    public void handleLaunchExperimentFromMessage(MessageContext 
messageContext) throws Exception {
+        ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+        byte[] bytes = 
ThriftUtils.serializeThriftObject(messageContext.getEvent());
+        ThriftUtils.createThriftFromBytes(bytes, expEvent);
+        
+        if (messageContext.isRedeliver()) {
+            ExperimentModel experimentModel = 
orchestratorRegistryService.getExperiment(expEvent.getExperimentId());
+            if (experimentModel != null && 
experimentModel.getExperimentStatus().get(0).getState() == 
ExperimentState.CREATED) {
+                handleLaunchExperiment(expEvent);
+            }
+        } else {
+            handleLaunchExperiment(expEvent);
+        }
+    }
+
+    public void handleCancelExperiment(ExperimentSubmitEvent expEvent) throws 
Exception {
+        terminateExperiment(expEvent.getExperimentId(), 
expEvent.getGatewayId());
+    }
+
+    public void 
handleIntermediateOutputsEvent(ExperimentIntermediateOutputsEvent event) throws 
Exception {
+        fetchIntermediateOutputs(event.getExperimentId(), 
event.getGatewayId(), event.getOutputNames());
+    }
+
+    public boolean launchExperimentWithErrorHandling(String experimentId, 
String gatewayId, java.util.concurrent.ExecutorService executorService) throws 
TException {
+        try {
+            boolean result = launchExperiment(experimentId, gatewayId);
+            if (result) {
+                ExperimentModel experiment = 
orchestratorRegistryService.getExperiment(experimentId);
+                String token = getCredentialToken(experiment, 
experiment.getUserConfigurationData());
+                ExperimentStatus status = new 
ExperimentStatus(ExperimentState.LAUNCHED);
+                status.setReason("submitted all processes");
+                
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                
OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, 
publisher, gatewayId);
+                logger.info("expId: {}, Launched experiment ", experimentId);
+                
+                // Execute the single app experiment runner in the provided 
thread pool
+                if (executorService != null) {
+                    Runnable runner = () -> {
+                        try {
+                            launchSingleAppExperimentInternal(experimentId, 
token, gatewayId);
+                        } catch (Exception e) {
+                            logger.error("expId: " + experimentId + ", Error 
while launching single app experiment", e);
+                        }
+                    };
+                    executorService.execute(MDCUtil.wrapWithMDC(runner));
+                }
+            }
+            return result;
+        } catch (LaunchValidationException launchValidationException) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("Validation failed: " + 
launchValidationException.getErrorMessage());
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            throw new TException(
+                    "Experiment '" + experimentId + "' launch failed. 
Experiment failed to validate: "
+                            + launchValidationException.getErrorMessage(),
+                    launchValidationException);
+        } catch (org.apache.airavata.registry.cpi.RegistryException e) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("Registry error: " + e.getMessage());
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            throw new TException("Experiment '" + experimentId + "' launch 
failed.", e);
+        } catch (org.apache.airavata.registry.cpi.AppCatalogException e) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("App catalog error: " + e.getMessage());
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            throw new TException("Experiment '" + experimentId + "' launch 
failed.", e);
+        } catch (Exception e) {
+            ExperimentStatus status = new 
ExperimentStatus(ExperimentState.FAILED);
+            status.setReason("Unexpected error occurred: " + e.getMessage());
+            
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, 
status, publisher, gatewayId);
+            throw new TException("Experiment '" + experimentId + "' launch 
failed.", e);
+        }
+    }
+}
+

Reply via email to