This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch resource-mgmt-rest-api in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 6a66621b47d470efab6ab32b24f1d3b66133e0d8 Author: yasithdev <[email protected]> AuthorDate: Mon Nov 10 18:40:37 2025 -0500 update OrchestratorServerHandler --- .../server/OrchestratorServerHandler.java | 802 ++------------------- .../airavata/service/OrchestratorService.java | 737 +++++++++++++++++++ 2 files changed, 783 insertions(+), 756 deletions(-) diff --git a/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index e00a9d1c3c..6390572085 100644 --- a/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/airavata-api/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -60,9 +60,7 @@ import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor; import org.apache.airavata.orchestrator.util.OrchestratorUtils; -import org.apache.airavata.registry.api.RegistryService; -import org.apache.airavata.registry.api.RegistryService.Client; -import org.apache.airavata.registry.api.client.RegistryServiceClientFactory; +import org.apache.airavata.service.OrchestratorRegistryService; import org.apache.airavata.registry.api.exception.RegistryServiceException; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; @@ -87,6 +85,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private final Subscriber experimentSubscriber; private CuratorFramework curatorClient; + private OrchestratorRegistryService orchestratorRegistryService = new OrchestratorRegistryService(); + private org.apache.airavata.service.OrchestratorService orchestratorService; /** * Query orchestrator server to fetch the CPI version @@ -106,10 +106,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { publisher = MessagingFactory.getPublisher(Type.STATUS); orchestrator.initialize(); - orchestrator.getOrchestratorContext().setPublisher(this.publisher); + orchestrator.getOrchestratorContext().setPublisher(publisher); statusSubscribe = getStatusSubscriber(); experimentSubscriber = getExperimentSubscriber(); startCurator(); + orchestratorService = new org.apache.airavata.service.OrchestratorService(orchestratorRegistryService, orchestrator, curatorClient, publisher); } catch (OrchestratorException | AiravataException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); @@ -139,178 +140,14 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { * @param experimentId */ public boolean launchExperiment(String experimentId, String gatewayId) throws TException { - ExperimentModel experiment = null; - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - // TODO deprecate this approach as we are replacing gfac - String experimentNodePath = getExperimentNodePath(experimentId); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); - String experimentCancelNode = - ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); - experiment = registryClient.getExperiment(experimentId); - if (experiment == null) { - throw new Exception("Error retrieving the Experiment by the given experimentID: " + experimentId); - } - - UserConfigurationDataModel userConfigurationData = experiment.getUserConfigurationData(); - String token = null; - final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); - if (groupResourceProfileId == null) { - throw new Exception("Experiment not configured with a Group Resource Profile: " + experimentId); - } - - if (userConfigurationData.getComputationalResourceScheduling() != null - && userConfigurationData - .getComputationalResourceScheduling() - .isSet(ComputationalResourceSchedulingModel._Fields.RESOURCE_HOST_ID)) { - GroupComputeResourcePreference groupComputeResourcePreference = - registryClient.getGroupComputeResourcePreference( - userConfigurationData - .getComputationalResourceScheduling() - .getResourceHostId(), - groupResourceProfileId); - - if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) { - token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - } - } - if (token == null || token.isEmpty()) { - // try with group resource profile level token - GroupResourceProfile groupResourceProfile = - registryClient.getGroupResourceProfile(groupResourceProfileId); - token = groupResourceProfile.getDefaultCredentialStoreToken(); - } - // still the token is empty, then we fail the experiment - if (token == null || token.isEmpty()) { - throw new Exception( - "You have not configured credential store token at group resource profile or compute resource preference." - + " Please provide the correct token at group resource profile or compute resource preference."); - } - ExperimentType executionType = experiment.getExperimentType(); - if (executionType == ExperimentType.SINGLE_APPLICATION) { - // its an single application execution experiment - List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId); - - for (ProcessModel processModel : processes) { - // FIXME Resolving replica if available. This is a very crude way of resolving input replicas. A - // full featured - // FIXME replica resolving logic should come here - processModel.getProcessInputs().stream().forEach(pi -> { - if (pi.getType().equals(DataType.URI) - && pi.getValue() != null - && pi.getValue().startsWith("airavata-dp://")) { - try { - DataProductModel dataProductModel = registryClient.getDataProduct(pi.getValue()); - Optional<DataReplicaLocationModel> rpLocation = - dataProductModel.getReplicaLocations().stream() - .filter(rpModel -> rpModel.getReplicaLocationCategory() - .equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)) - .findFirst(); - if (rpLocation.isPresent()) { - pi.setValue(rpLocation.get().getFilePath()); - pi.setStorageResourceId(rpLocation.get().getStorageResourceId()); - } else { - log.error("Could not find a replica for the URI " + pi.getValue()); - } - } catch (RegistryServiceException e) { - throw new RuntimeException("Error while launching experiment", e); - } catch (TException e) { - throw new RuntimeException("Error while launching experiment", e); - } - } else if (pi.getType().equals(DataType.URI_COLLECTION) - && pi.getValue() != null - && pi.getValue().contains("airavata-dp://")) { - try { - String[] uriList = pi.getValue().split(","); - final ArrayList<String> filePathList = new ArrayList<>(); - for (String uri : uriList) { - if (uri.startsWith("airavata-dp://")) { - DataProductModel dataProductModel = registryClient.getDataProduct(uri); - Optional<DataReplicaLocationModel> rpLocation = - dataProductModel.getReplicaLocations().stream() - .filter(rpModel -> rpModel.getReplicaLocationCategory() - .equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)) - .findFirst(); - if (rpLocation.isPresent()) { - filePathList.add(rpLocation.get().getFilePath()); - } else { - log.error("Could not find a replica for the URI " + pi.getValue()); - } - } else { - // uri is in file path format - filePathList.add(uri); - } - } - pi.setValue(StringUtils.join(filePathList, ',')); - } catch (RegistryServiceException e) { - throw new RuntimeException("Error while launching experiment", e); - } catch (TException e) { - throw new RuntimeException("Error while launching experiment", e); - } - } - }); - - if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()) { - String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel); - processModel.setTaskDag(taskDag); - } - registryClient.updateProcess(processModel, processModel.getProcessId()); - } - - if (!experiment.getUserConfigurationData().isAiravataAutoSchedule() - && !validateProcess(experimentId, processes)) { - throw new Exception("Validating process fails for given experiment Id : " + experimentId); - } - - ProcessScheduler scheduler = new ProcessSchedulerImpl(); - if (!experiment.getUserConfigurationData().isAiravataAutoSchedule() - || scheduler.canLaunch(experimentId)) { - createAndValidateTasks(experiment, registryClient, false); - runExperimentLauncher(experimentId, gatewayId, token); - } else { - log.debug(experimentId, "Queuing single application experiment {}.", experimentId); - ExperimentStatus status = new ExperimentStatus(ExperimentState.SCHEDULED); - status.setReason("Compute resources are not ready"); - status.setTimeOfStateChange( - AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); - log.info("expId: {}, Scheduled experiment ", experimentId); - } - } else if (executionType == ExperimentType.WORKFLOW) { - // its a workflow execution experiment - log.debug(experimentId, "Launching workflow experiment {}.", experimentId); - launchWorkflowExperiment(experimentId, token, gatewayId); - } else { - log.error( - experimentId, - "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", - experimentId); - throw new TException("Experiment '" + experimentId - + "' launch failed. Unable to figureout execution type for application " - + experiment.getExecutionId()); - } - } catch (LaunchValidationException launchValidationException) { - ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); - status.setReason("Validation failed: " + launchValidationException.getErrorMessage()); - status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); - throw new TException( - "Experiment '" + experimentId + "' launch failed. Experiment failed to validate: " - + launchValidationException.getErrorMessage(), - launchValidationException); + return orchestratorService.launchExperimentWithErrorHandling( + experimentId, gatewayId, OrchestratorServerThreadPoolExecutor.getCachedThreadPool()); + } catch (TException e) { + throw e; } catch (Exception e) { - ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); - status.setReason("Unexpected error occurred: " + e.getMessage()); - status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); throw new TException("Experiment '" + experimentId + "' launch failed.", e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } } - return true; } /** @@ -323,50 +160,39 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { * @throws TException */ public boolean validateExperiment(String experimentId) throws TException, LaunchValidationException { - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - ExperimentModel experimentModel = registryClient.getExperiment(experimentId); - return orchestrator.validateExperiment(experimentModel).isValidationState(); + return orchestratorService.validateExperiment(experimentId); } catch (OrchestratorException e) { log.error(experimentId, "Error while validating experiment", e); throw new TException(e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + log.error(experimentId, "Error while retrieving experiment for validation", e); + throw new TException(e); } } @Override public boolean validateProcess(String experimentId, List<ProcessModel> processes) throws LaunchValidationException, TException { - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - ExperimentModel experimentModel = registryClient.getExperiment(experimentId); - for (ProcessModel processModel : processes) { - boolean state = orchestrator - .validateProcess(experimentModel, processModel) - .isSetValidationState(); - if (!state) { - return false; - } - } - return true; + return orchestratorService.validateProcess(experimentId, processes); } catch (LaunchValidationException lve) { - // If a process failed to validate, also add an error message at the experiment level ErrorModel details = new ErrorModel(); details.setActualErrorMessage(lve.getErrorMessage()); details.setCreationTime(Calendar.getInstance().getTimeInMillis()); - registryClient.addErrors(OrchestratorConstants.EXPERIMENT_ERROR, details, experimentId); + try { + orchestratorService.addProcessValidationErrors(experimentId, details); + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + log.error(experimentId, "Error while adding errors to experiment", e); + } throw lve; } catch (OrchestratorException e) { log.error(experimentId, "Error while validating process", e); throw new TException(e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + log.error(experimentId, "Error while retrieving experiment for process validation", e); + throw new TException(e); } } @@ -379,112 +205,24 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { * @throws TException */ public boolean terminateExperiment(String experimentId, String gatewayId) throws TException { - final RegistryService.Client registryClient = getRegistryServiceClient(); log.info(experimentId, "Experiment: {} is cancelling !!!!!", experimentId); try { - return validateStatesAndCancel(registryClient, experimentId, gatewayId); + return orchestratorService.terminateExperiment(experimentId, gatewayId); } catch (Exception e) { log.error("expId : " + experimentId + " :- Error while cancelling experiment", e); return false; - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } } } public void fetchIntermediateOutputs(String experimentId, String gatewayId, List<String> outputNames) throws TException { - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - submitIntermediateOutputsProcess(registryClient, experimentId, gatewayId, outputNames); + orchestratorService.fetchIntermediateOutputs(experimentId, gatewayId, outputNames); } catch (Exception e) { log.error("expId : " + experimentId + " :- Error while fetching intermediate", e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } } } - private void submitIntermediateOutputsProcess( - Client registryClient, String experimentId, String gatewayId, List<String> outputNames) throws Exception { - - ExperimentModel experimentModel = registryClient.getExperiment(experimentId); - ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel); - processModel.setExperimentDataDir(processModel.getExperimentDataDir() + "/intermediates"); - - List<OutputDataObjectType> applicationOutputs = registryClient.getApplicationOutputs( - experimentModel.getExecutionId()); // This is to get a clean output object set - List<OutputDataObjectType> requestedOutputs = new ArrayList<>(); - - for (OutputDataObjectType output : applicationOutputs) { - if (outputNames.contains(output.getName())) { - requestedOutputs.add(output); - } - } - processModel.setProcessOutputs(requestedOutputs); - String processId = registryClient.addProcess(processModel, experimentId); - processModel.setProcessId(processId); - - try { - // Find the process that is responsible for main experiment workflow by - // looking for the process that has the JOB_SUBMISSION task - Optional<ProcessModel> jobSubmissionProcess = experimentModel.getProcesses().stream() - .filter(p -> p.getTasks().stream().anyMatch(t -> t.getTaskType() == TaskTypes.JOB_SUBMISSION)) - .findFirst(); - if (!jobSubmissionProcess.isPresent()) { - throw new Exception(MessageFormat.format( - "Could not find job submission process for experiment {0}, unable to fetch intermediate outputs {1}", - experimentId, outputNames)); - } - String taskDag = orchestrator.createAndSaveIntermediateOutputFetchingTasks( - gatewayId, processModel, jobSubmissionProcess.get()); - processModel.setTaskDag(taskDag); - - registryClient.updateProcess(processModel, processModel.getProcessId()); - - // Figure out the credential token - UserConfigurationDataModel userConfigurationData = experimentModel.getUserConfigurationData(); - String token = null; - final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); - if (groupResourceProfileId == null) { - throw new Exception("Experiment not configured with a Group Resource Profile: " + experimentId); - } - GroupComputeResourcePreference groupComputeResourcePreference = - registryClient.getGroupComputeResourcePreference( - userConfigurationData - .getComputationalResourceScheduling() - .getResourceHostId(), - groupResourceProfileId); - if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) { - token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - } - if (token == null || token.isEmpty()) { - // try with group resource profile level token - GroupResourceProfile groupResourceProfile = - registryClient.getGroupResourceProfile(groupResourceProfileId); - token = groupResourceProfile.getDefaultCredentialStoreToken(); - } - // still the token is empty, then we fail the experiment - if (token == null || token.isEmpty()) { - throw new Exception( - "You have not configured credential store token at group resource profile or compute resource preference." - + " Please provide the correct token at group resource profile or compute resource preference."); - } - orchestrator.launchProcess(processModel, token); - } catch (Exception e) { - log.error("Failed to launch process for intermediate output fetching", e); - - // Update Process status to FAILED - ProcessStatus status = new ProcessStatus(ProcessState.FAILED); - status.setReason("Intermediate output fetching process failed to launch: " + e.getMessage()); - status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - registryClient.addProcessStatus(status, processId); - - throw e; - } - } private String getAiravataUserName() { return airavataUserName; @@ -504,234 +242,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { @Override public boolean launchProcess(String processId, String airavataCredStoreToken, String gatewayId) throws TException { - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - ProcessStatus processStatus = registryClient.getProcessStatus(processId); - - switch (processStatus.getState()) { - case CREATED: - case VALIDATED: - case DEQUEUING: - ProcessModel processModel = registryClient.getProcess(processId); - String applicationId = processModel.getApplicationInterfaceId(); - if (applicationId == null) { - log.error(processId, "Application interface id shouldn't be null."); - throw new OrchestratorException( - "Error executing the job, application interface id shouldn't be null."); - } - // set application deployment id to process model - ApplicationDeploymentDescription applicationDeploymentDescription = - getAppDeployment(registryClient, processModel, applicationId); - if (applicationDeploymentDescription == null) { - log.error("Could not find an application deployment for " + processModel.getComputeResourceId() - + " and application " + applicationId); - throw new OrchestratorException("Could not find an application deployment for " - + processModel.getComputeResourceId() + " and application " + applicationId); - } - processModel.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId()); - // set compute resource id to process model, default we set the same in the user preferred compute - // host id - processModel.setComputeResourceId( - processModel.getProcessResourceSchedule().getResourceHostId()); - registryClient.updateProcess(processModel, processModel.getProcessId()); - return orchestrator.launchProcess(processModel, airavataCredStoreToken); - - default: - log.warn("Process " + processId + " is already launched. So it can not be relaunched"); - return false; - } - + return orchestratorService.launchProcess(processId, airavataCredStoreToken, gatewayId); + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + log.error(processId, "Error while launching process ", e); + throw new TException(e); } catch (Exception e) { log.error(processId, "Error while launching process ", e); throw new TException(e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } } } - private ApplicationDeploymentDescription getAppDeployment( - RegistryService.Client registryClient, ProcessModel processModel, String applicationId) - throws OrchestratorException, ClassNotFoundException, ApplicationSettingsException, InstantiationException, - IllegalAccessException, TException { - String selectedModuleId = getModuleId(registryClient, applicationId); - return getAppDeploymentForModule(registryClient, processModel, selectedModuleId); - } - - private ApplicationDeploymentDescription getAppDeploymentForModule( - RegistryService.Client registryClient, ProcessModel processModel, String selectedModuleId) - throws ClassNotFoundException, ApplicationSettingsException, InstantiationException, IllegalAccessException, - TException { - List<ApplicationDeploymentDescription> applicationDeployements = - registryClient.getApplicationDeployments(selectedModuleId); - Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = - new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>(); - - for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) { - if (processModel.getComputeResourceId().equals(deploymentDescription.getComputeHostId())) { - deploymentMap.put( - registryClient.getComputeResource(deploymentDescription.getComputeHostId()), - deploymentDescription); - } - } - List<ComputeResourceDescription> computeHostList = - Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[] {})); - Class<? extends HostScheduler> aClass = - Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class); - HostScheduler hostScheduler = aClass.newInstance(); - ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); - return deploymentMap.get(ComputeResourceDescription); - } - - private String getModuleId(RegistryService.Client registryClient, String applicationId) - throws OrchestratorException, TException { - ApplicationInterfaceDescription applicationInterface = registryClient.getApplicationInterface(applicationId); - List<String> applicationModules = applicationInterface.getApplicationModules(); - if (applicationModules.size() == 0) { - throw new OrchestratorException("No modules defined for application " + applicationId); - } - // AiravataAPI airavataAPI = getAiravataAPI(); - String selectedModuleId = applicationModules.get(0); - return selectedModuleId; - } - - private boolean validateStatesAndCancel( - RegistryService.Client registryClient, String experimentId, String gatewayId) throws Exception { - ExperimentStatus experimentStatus = registryClient.getExperimentStatus(experimentId); - switch (experimentStatus.getState()) { - case COMPLETED: - case CANCELED: - case FAILED: - case CANCELING: - log.warn( - "Can't terminate already {} experiment", - experimentStatus.getState().name()); - return false; - case CREATED: - log.warn("Experiment termination is only allowed for launched experiments."); - return false; - default: - ExperimentModel experimentModel = registryClient.getExperiment(experimentId); - final UserConfigurationDataModel userConfigurationData = experimentModel.getUserConfigurationData(); - final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); - - GroupComputeResourcePreference groupComputeResourcePreference = - registryClient.getGroupComputeResourcePreference( - userConfigurationData - .getComputationalResourceScheduling() - .getResourceHostId(), - groupResourceProfileId); - String token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - if (token == null || token.isEmpty()) { - // try with group resource profile level token - GroupResourceProfile groupResourceProfile = - registryClient.getGroupResourceProfile(groupResourceProfileId); - token = groupResourceProfile.getDefaultCredentialStoreToken(); - } - // still the token is empty, then we fail the experiment - if (token == null || token.isEmpty()) { - log.error( - "You have not configured credential store token at group resource profile or compute resource preference." - + " Please provide the correct token at group resource profile or compute resource preference."); - return false; - } - - orchestrator.cancelExperiment(experimentModel, token); - // TODO deprecate this approach as we are replacing gfac - String expCancelNodePath = ZKPaths.makePath( - ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), - ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - Stat stat = curatorClient.checkExists().forPath(expCancelNodePath); - if (stat != null) { - curatorClient - .setData() - .withVersion(-1) - .forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST.getBytes()); - ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING); - status.setReason("Experiment cancel request processed"); - status.setTimeOfStateChange( - AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); - log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState()); - } - return true; - } - } - - private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken, String gatewayId) - throws TException { - // FIXME - // try { - // WorkflowEnactmentService.getInstance(). - // submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), - // getRabbitMQProcessPublisher()); - // } catch (Exception e) { - // log.error("Error while launching workflow", e); - // } - } - - private class SingleAppExperimentRunner implements Runnable { - - String experimentId; - String airavataCredStoreToken; - String gatewayId; - - public SingleAppExperimentRunner(String experimentId, String airavataCredStoreToken, String gatewayId) { - this.experimentId = experimentId; - this.airavataCredStoreToken = airavataCredStoreToken; - this.gatewayId = gatewayId; - } - - @Override - public void run() { - try { - launchSingleAppExperiment(); - } catch (TException e) { - log.error("Unable to launch experiment..", e); - throw new RuntimeException("Error while launching experiment", e); - } catch (AiravataException e) { - log.error("Unable to publish experiment status..", e); - } - } - - private boolean launchSingleAppExperiment() throws TException, AiravataException { - final RegistryService.Client registryClient = getRegistryServiceClient(); - try { - List<String> processIds = registryClient.getProcessIds(experimentId); - for (String processId : processIds) { - launchProcess(processId, airavataCredStoreToken, gatewayId); - } - // ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); - // status.setReason("submitted all processes"); - // status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - // OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status); - // log.info("expId: {}, Launched experiment ", experimentId); - } catch (Exception e) { - ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); - status.setReason("Error while updating task status"); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); - log.error( - "expId: " + experimentId - + ", Error while updating task status, hence updated experiment status to " - + ExperimentState.FAILED, - e); - ExperimentStatusChangeEvent event = - new ExperimentStatusChangeEvent(ExperimentState.FAILED, experimentId, gatewayId); - String messageId = AiravataUtils.getId("EXPERIMENT"); - MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); - throw new TException(e); - } finally { - if (registryClient != null) { - ThriftUtils.close(registryClient); - } - } - return true; - } - } private class ProcessStatusHandler implements MessageHandler { /** @@ -747,154 +269,21 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { TBase event = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(event); ThriftUtils.createThriftFromBytes(bytes, processStatusChangeEvent); - ExperimentStatus status = new ExperimentStatus(); ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity(); log.info( "expId: {}, processId: {} :- Process status changed event received for status {}", processIdentity.getExperimentId(), processIdentity.getProcessId(), processStatusChangeEvent.getState().name()); - try { - ProcessModel process = OrchestratorUtils.getProcess(processIdentity.getProcessId()); - boolean isIntermediateOutputFetchingProcess = - process.getTasks().stream().anyMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING); - if (isIntermediateOutputFetchingProcess) { - log.info( - "Not updating experiment status because process is an intermediate output fetching one"); - return; - } - } catch (ApplicationSettingsException e) { - throw new RuntimeException("Error getting process " + processIdentity.getProcessId(), e); - } - switch (processStatusChangeEvent.getState()) { - // case CREATED: - // case VALIDATED: - case STARTED: - try { - ExperimentStatus stat = - OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId()); - if (stat.getState() == ExperimentState.CANCELING) { - status.setState(ExperimentState.CANCELING); - status.setReason("Process started but experiment cancelling is triggered"); - } else { - status.setState(ExperimentState.EXECUTING); - status.setReason("process started"); - } - } catch (ApplicationSettingsException e) { - throw new RuntimeException("Error ", e); - } - break; - // case PRE_PROCESSING: - // break; - // case CONFIGURING_WORKSPACE: - // case INPUT_DATA_STAGING: - // case EXECUTING: - // case MONITORING: - // case OUTPUT_DATA_STAGING: - // case POST_PROCESSING: - // case CANCELLING: - // break; - case COMPLETED: - try { - ExperimentStatus stat = - OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId()); - if (stat.getState() == ExperimentState.CANCELING) { - status.setState(ExperimentState.CANCELED); - status.setReason("Process competed but experiment cancelling is triggered"); - } else { - status.setState(ExperimentState.COMPLETED); - status.setReason("process completed"); - } - } catch (ApplicationSettingsException e) { - throw new RuntimeException("Error ", e); - } - break; - case FAILED: - try { - ExperimentStatus stat = - OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId()); - if (stat.getState() == ExperimentState.CANCELING) { - status.setState(ExperimentState.CANCELED); - status.setReason("Process failed but experiment cancelling is triggered"); - } else { - status.setState(ExperimentState.FAILED); - status.setReason("process failed"); - } - } catch (ApplicationSettingsException e) { - throw new RuntimeException("Unable to create registry client...", e); - } - break; - case CANCELED: - // TODO if experiment have more than one process associated with it, then this should be - // changed. - status.setState(ExperimentState.CANCELED); - status.setReason("process cancelled"); - break; - case QUEUED: - status.setState(ExperimentState.SCHEDULED); - status.setReason("Process started but compute resource not avaialable"); - break; - case REQUEUED: - status.setState(ExperimentState.SCHEDULED); - status.setReason("Job submission failed, requeued to resubmit"); - List<QueueStatusModel> queueStatusModels = new ArrayList<>(); - final RegistryService.Client registryClient = getRegistryServiceClient(); - ExperimentModel experimentModel = - registryClient.getExperiment(processIdentity.getExperimentId()); - UserConfigurationDataModel userConfigurationDataModel = - experimentModel.getUserConfigurationData(); - if (userConfigurationDataModel != null) { - ComputationalResourceSchedulingModel computationalResourceSchedulingModel = - userConfigurationDataModel.getComputationalResourceScheduling(); - if (computationalResourceSchedulingModel != null) { - String queueName = computationalResourceSchedulingModel.getQueueName(); - String resourceId = computationalResourceSchedulingModel.getResourceHostId(); - ComputeResourceDescription comResourceDes = - registryClient.getComputeResource(resourceId); - QueueStatusModel queueStatusModel = new QueueStatusModel(); - queueStatusModel.setHostName(comResourceDes.getHostName()); - queueStatusModel.setQueueName(queueName); - queueStatusModel.setQueueUp(false); - queueStatusModel.setRunningJobs(0); - queueStatusModel.setQueuedJobs(0); - queueStatusModel.setTime(System.currentTimeMillis()); - queueStatusModels.add(queueStatusModel); - registryClient.registerQueueStatuses(queueStatusModels); - } - } - - break; - case DEQUEUING: - try { - ExperimentStatus stat = - OrchestratorUtils.getExperimentStatus(processIdentity.getExperimentId()); - if (stat.getState() == ExperimentState.CANCELING) { - status.setState(ExperimentState.CANCELING); - status.setReason("Process started but experiment cancelling is triggered"); - } else { - launchQueuedExperiment(processIdentity.getExperimentId()); - } - - } catch (Exception e) { - throw new RuntimeException("Error ", e); - } - break; - default: - // ignore other status changes, thoes will not affect for experiment status changes - return; - } - if (status.getState() != null) { - status.setTimeOfStateChange( - AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus( - processIdentity.getExperimentId(), status, publisher, processIdentity.getGatewayId()); - log.info("expId : " + processIdentity.getExperimentId() + " :- Experiment status updated to " - + status.getState()); - } + orchestratorService.handleProcessStatusChange(processStatusChangeEvent, processIdentity); } catch (TException e) { log.error("Message Id : " + message.getMessageId() + ", Message type : " + message.getType() + "Error" + " while prcessing process status change event"); throw new RuntimeException("Error while updating experiment status", e); + } catch (Exception e) { + log.error("Message Id : " + message.getMessageId() + ", Message type : " + message.getType() + + "Error" + " while prcessing process status change event", e); + throw new RuntimeException("Error while updating experiment status", e); } } else { System.out.println("Message Recieved with message id " + message.getMessageId() + " and with message " @@ -935,10 +324,13 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { "Cancelling experiment with experimentId: {} gateway Id: {}", expEvent.getExperimentId(), expEvent.getGatewayId()); - terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + orchestratorService.handleCancelExperiment(expEvent); } catch (TException e) { log.error("Error while cancelling experiment", e); throw new RuntimeException("Error while cancelling experiment", e); + } catch (Exception e) { + log.error("Error while cancelling experiment", e); + throw new RuntimeException("Error while cancelling experiment", e); } finally { experimentSubscriber.sendAck(messageContext.getDeliveryTag()); } @@ -954,10 +346,13 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { event.getExperimentId(), event.getGatewayId(), event.getOutputNames()); - fetchIntermediateOutputs(event.getExperimentId(), event.getGatewayId(), event.getOutputNames()); + orchestratorService.handleIntermediateOutputsEvent(event); } catch (TException e) { log.error("Error while fetching intermediate outputs", e); throw new RuntimeException("Error while fetching intermediate outputs", e); + } catch (Exception e) { + log.error("Error while fetching intermediate outputs", e); + throw new RuntimeException("Error while fetching intermediate outputs", e); } finally { experimentSubscriber.sendAck(messageContext.getDeliveryTag()); } @@ -965,57 +360,20 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } private void launchExperiment(MessageContext messageContext) { - ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); - final RegistryService.Client registryClient = getRegistryServiceClient(); try { - byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); - ThriftUtils.createThriftFromBytes(bytes, expEvent); - MDC.put(MDCConstants.EXPERIMENT_ID, expEvent.getExperimentId()); - log.info( - "Launching experiment with experimentId: {} gateway Id: {}", - expEvent.getExperimentId(), - expEvent.getGatewayId()); - if (messageContext.isRedeliver()) { - ExperimentModel experimentModel = registryClient.getExperiment(expEvent.getExperimentId()); - MDC.put(MDCConstants.EXPERIMENT_NAME, experimentModel.getExperimentName()); - if (experimentModel.getExperimentStatus().get(0).getState() == ExperimentState.CREATED) { - launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); - } - } else { - launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); - } + orchestratorService.handleLaunchExperimentFromMessage(messageContext); } catch (TException e) { - String logMessage = expEvent.getExperimentId() != null && expEvent.getGatewayId() != null - ? String.format( - "Experiment launch failed due to Thrift conversion error, experimentId: %s, gatewayId: %s", - expEvent.getExperimentId(), expEvent.getGatewayId()) - : "Experiment launch failed due to Thrift conversion error"; - log.error(logMessage, e); + log.error("Experiment launch failed due to Thrift conversion error", e); + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + log.error("Experiment launch failed due to registry error", e); } catch (Exception e) { - log.error( - "An unknown issue while launching experiment " - + Optional.ofNullable(expEvent.getExperimentId()).orElse("missing experiment") - + " on gateway " - + Optional.ofNullable(expEvent.getGatewayId()).orElse("missing gateway"), - e); + log.error("An unknown issue while launching experiment", e); } finally { experimentSubscriber.sendAck(messageContext.getDeliveryTag()); MDC.clear(); - if (registryClient != null) { - ThriftUtils.close(registryClient); - } } } - private RegistryService.Client getRegistryServiceClient() { - try { - final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort()); - final String serverHost = ServerSettings.getRegistryServerHost(); - return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort); - } catch (RegistryServiceException | ApplicationSettingsException e) { - throw new RuntimeException("Unable to create registry client...", e); - } - } private void startCurator() throws ApplicationSettingsException { String connectionSting = ServerSettings.getZookeeperConnection(); @@ -1028,72 +386,4 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId); } - private void runExperimentLauncher(String experimentId, String gatewayId, String token) throws TException { - ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); - status.setReason("submitted all processes"); - status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); - log.info("expId: {}, Launched experiment ", experimentId); - OrchestratorServerThreadPoolExecutor.getCachedThreadPool() - .execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId))); - } - - private void launchQueuedExperiment(String experimentId) throws TException, Exception { - ExperimentModel experiment = null; - final RegistryService.Client registryClient = getRegistryServiceClient(); - // TODO deprecate this approach as we are replacing gfac - experiment = registryClient.getExperiment(experimentId); - if (experiment == null) { - throw new Exception("Error retrieving the Experiment by the given experimentID: " + experimentId); - } - - UserConfigurationDataModel userConfigurationData = experiment.getUserConfigurationData(); - String token = null; - final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); - if (groupResourceProfileId == null) { - throw new Exception("Experiment not configured with a Group Resource Profile: " + experimentId); - } - GroupComputeResourcePreference groupComputeResourcePreference = - registryClient.getGroupComputeResourcePreference( - userConfigurationData - .getComputationalResourceScheduling() - .getResourceHostId(), - groupResourceProfileId); - if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) { - token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - } - if (token == null || token.isEmpty()) { - // try with group resource profile level token - GroupResourceProfile groupResourceProfile = registryClient.getGroupResourceProfile(groupResourceProfileId); - token = groupResourceProfile.getDefaultCredentialStoreToken(); - } - // still the token is empty, then we fail the experiment - if (token == null || token.isEmpty()) { - throw new Exception( - "You have not configured credential store token at group resource profile or compute resource preference." - + " Please provide the correct token at group resource profile or compute resource preference."); - } - createAndValidateTasks(experiment, registryClient, true); - runExperimentLauncher(experimentId, experiment.getGatewayId(), token); - } - - private void createAndValidateTasks( - ExperimentModel experiment, RegistryService.Client registryClient, boolean recreateTaskDag) - throws Exception { - if (experiment.getUserConfigurationData().isAiravataAutoSchedule()) { - List<ProcessModel> processModels = registryClient.getProcessList(experiment.getExperimentId()); - for (ProcessModel processModel : processModels) { - if (processModel.getTaskDag() == null || recreateTaskDag) { - registryClient.deleteTasks(processModel.getProcessId()); - String taskDag = orchestrator.createAndSaveTasks(experiment.getGatewayId(), processModel); - processModel.setTaskDag(taskDag); - registryClient.updateProcess(processModel, processModel.getProcessId()); - } - } - if (!validateProcess(experiment.getExperimentId(), processModels)) { - throw new Exception( - "Validating process fails for given experiment Id : " + experiment.getExperimentId()); - } - } - } } diff --git a/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java b/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java new file mode 100644 index 0000000000..d9e073102b --- /dev/null +++ b/airavata-api/src/main/java/org/apache/airavata/service/OrchestratorService.java @@ -0,0 +1,737 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.airavata.service; + +import java.text.MessageFormat; +import java.util.*; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.logging.MDCUtil; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.common.utils.ZkConstants; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.metascheduler.core.api.ProcessScheduler; +import org.apache.airavata.metascheduler.process.scheduling.api.ProcessSchedulerImpl; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference; +import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile; +import org.apache.airavata.model.application.io.DataType; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.data.replica.DataProductModel; +import org.apache.airavata.model.data.replica.DataReplicaLocationModel; +import org.apache.airavata.model.data.replica.ReplicaLocationCategory; +import org.apache.airavata.model.error.LaunchValidationException; +import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.experiment.ExperimentType; +import org.apache.airavata.model.experiment.UserConfigurationDataModel; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; +import org.apache.airavata.model.status.ExperimentState; +import org.apache.airavata.model.status.ExperimentStatus; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.status.QueueStatusModel; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.model.messaging.event.ExperimentIntermediateOutputsEvent; +import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.util.ExperimentModelUtil; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.schedule.HostScheduler; +import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants; +import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl; +import org.apache.airavata.orchestrator.util.OrchestratorUtils; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.thrift.TException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrchestratorService { + private static final Logger logger = LoggerFactory.getLogger(OrchestratorService.class); + + private OrchestratorRegistryService orchestratorRegistryService; + private SimpleOrchestratorImpl orchestrator; + private CuratorFramework curatorClient; + private Publisher publisher; + + public OrchestratorService( + OrchestratorRegistryService orchestratorRegistryService, + SimpleOrchestratorImpl orchestrator, + CuratorFramework curatorClient, + Publisher publisher) { + this.orchestratorRegistryService = orchestratorRegistryService; + this.orchestrator = orchestrator; + this.curatorClient = curatorClient; + this.publisher = publisher; + } + + public boolean launchExperiment(String experimentId, String gatewayId) throws Exception { + String experimentNodePath = getExperimentNodePath(experimentId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); + String experimentCancelNode = + ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); + ExperimentModel experiment = orchestratorRegistryService.getExperiment(experimentId); + if (experiment == null) { + throw new Exception("Error retrieving the Experiment by the given experimentID: " + experimentId); + } + + UserConfigurationDataModel userConfigurationData = experiment.getUserConfigurationData(); + String token = getCredentialToken(experiment, userConfigurationData); + + ExperimentType executionType = experiment.getExperimentType(); + if (executionType == ExperimentType.SINGLE_APPLICATION) { + return launchSingleAppExperiment(experiment, experimentId, gatewayId, token); + } else if (executionType == ExperimentType.WORKFLOW) { + logger.debug(experimentId, "Launching workflow experiment {}.", experimentId); + launchWorkflowExperiment(experimentId, token, gatewayId); + return true; + } else { + logger.error( + experimentId, + "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", + experimentId); + throw new TException("Experiment '" + experimentId + + "' launch failed. Unable to figureout execution type for application " + + experiment.getExecutionId()); + } + } + + private boolean launchSingleAppExperiment( + ExperimentModel experiment, String experimentId, String gatewayId, String token) throws Exception { + List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId); + + for (ProcessModel processModel : processes) { + resolveInputReplicas(processModel); + + if (!experiment.getUserConfigurationData().isAiravataAutoSchedule()) { + String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel); + processModel.setTaskDag(taskDag); + } + orchestratorRegistryService.updateProcess(processModel, processModel.getProcessId()); + } + + if (!experiment.getUserConfigurationData().isAiravataAutoSchedule() + && !validateProcess(experimentId, processes)) { + throw new Exception("Validating process fails for given experiment Id : " + experimentId); + } + + ProcessScheduler scheduler = new ProcessSchedulerImpl(); + if (!experiment.getUserConfigurationData().isAiravataAutoSchedule() + || scheduler.canLaunch(experimentId)) { + createAndValidateTasks(experiment, false); + return true; // runExperimentLauncher will be called separately + } else { + logger.debug(experimentId, "Queuing single application experiment {}.", experimentId); + ExperimentStatus status = new ExperimentStatus(ExperimentState.SCHEDULED); + status.setReason("Compute resources are not ready"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + logger.info("expId: {}, Scheduled experiment ", experimentId); + return false; + } + } + + private void resolveInputReplicas(ProcessModel processModel) throws Exception { + for (var pi : processModel.getProcessInputs()) { + if (pi.getType().equals(DataType.URI) + && pi.getValue() != null + && pi.getValue().startsWith("airavata-dp://")) { + try { + DataProductModel dataProductModel = orchestratorRegistryService.getDataProduct(pi.getValue()); + Optional<DataReplicaLocationModel> rpLocation = + dataProductModel.getReplicaLocations().stream() + .filter(rpModel -> rpModel.getReplicaLocationCategory() + .equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)) + .findFirst(); + if (rpLocation.isPresent()) { + pi.setValue(rpLocation.get().getFilePath()); + pi.setStorageResourceId(rpLocation.get().getStorageResourceId()); + } else { + logger.error("Could not find a replica for the URI " + pi.getValue()); + } + } catch (RegistryException e) { + throw new Exception("Error while launching experiment", e); + } + } else if (pi.getType().equals(DataType.URI_COLLECTION) + && pi.getValue() != null + && pi.getValue().contains("airavata-dp://")) { + try { + String[] uriList = pi.getValue().split(","); + final ArrayList<String> filePathList = new ArrayList<>(); + for (String uri : uriList) { + if (uri.startsWith("airavata-dp://")) { + DataProductModel dataProductModel = orchestratorRegistryService.getDataProduct(uri); + Optional<DataReplicaLocationModel> rpLocation = + dataProductModel.getReplicaLocations().stream() + .filter(rpModel -> rpModel.getReplicaLocationCategory() + .equals(ReplicaLocationCategory.GATEWAY_DATA_STORE)) + .findFirst(); + if (rpLocation.isPresent()) { + filePathList.add(rpLocation.get().getFilePath()); + } else { + logger.error("Could not find a replica for the URI " + pi.getValue()); + } + } else { + filePathList.add(uri); + } + } + pi.setValue(StringUtils.join(filePathList, ',')); + } catch (RegistryException e) { + throw new Exception("Error while launching experiment", e); + } + } + } + } + + public String getCredentialToken(ExperimentModel experiment, UserConfigurationDataModel userConfigurationData) throws Exception { + String token = null; + final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); + if (groupResourceProfileId == null) { + throw new Exception("Experiment not configured with a Group Resource Profile: " + experiment.getExperimentId()); + } + + if (userConfigurationData.getComputationalResourceScheduling() != null + && userConfigurationData + .getComputationalResourceScheduling() + .isSet(ComputationalResourceSchedulingModel._Fields.RESOURCE_HOST_ID)) { + GroupComputeResourcePreference groupComputeResourcePreference = + orchestratorRegistryService.getGroupComputeResourcePreference( + userConfigurationData + .getComputationalResourceScheduling() + .getResourceHostId(), + groupResourceProfileId); + + if (groupComputeResourcePreference.getResourceSpecificCredentialStoreToken() != null) { + token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); + } + } + if (token == null || token.isEmpty()) { + GroupResourceProfile groupResourceProfile = + orchestratorRegistryService.getGroupResourceProfile(groupResourceProfileId); + token = groupResourceProfile.getDefaultCredentialStoreToken(); + } + if (token == null || token.isEmpty()) { + throw new Exception( + "You have not configured credential store token at group resource profile or compute resource preference." + + " Please provide the correct token at group resource profile or compute resource preference."); + } + return token; + } + + public boolean validateExperiment(String experimentId) throws TException, LaunchValidationException, RegistryException, OrchestratorException { + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(experimentId); + return orchestrator.validateExperiment(experimentModel).isValidationState(); + } + + public boolean validateProcess(String experimentId, List<ProcessModel> processes) + throws LaunchValidationException, TException, RegistryException, OrchestratorException { + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(experimentId); + for (ProcessModel processModel : processes) { + boolean state = orchestrator + .validateProcess(experimentModel, processModel) + .isSetValidationState(); + if (!state) { + return false; + } + } + return true; + } + + public boolean terminateExperiment(String experimentId, String gatewayId) throws Exception { + logger.info(experimentId, "Experiment: {} is cancelling !!!!!", experimentId); + return validateStatesAndCancel(experimentId, gatewayId); + } + + private boolean validateStatesAndCancel(String experimentId, String gatewayId) throws Exception { + ExperimentStatus experimentStatus = orchestratorRegistryService.getExperimentStatus(experimentId); + switch (experimentStatus.getState()) { + case COMPLETED: + case CANCELED: + case FAILED: + case CANCELING: + logger.warn( + "Can't terminate already {} experiment", + experimentStatus.getState().name()); + return false; + case CREATED: + logger.warn("Experiment termination is only allowed for launched experiments."); + return false; + default: + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(experimentId); + final UserConfigurationDataModel userConfigurationData = experimentModel.getUserConfigurationData(); + final String groupResourceProfileId = userConfigurationData.getGroupResourceProfileId(); + + GroupComputeResourcePreference groupComputeResourcePreference = + orchestratorRegistryService.getGroupComputeResourcePreference( + userConfigurationData + .getComputationalResourceScheduling() + .getResourceHostId(), + groupResourceProfileId); + String token = groupComputeResourcePreference.getResourceSpecificCredentialStoreToken(); + if (token == null || token.isEmpty()) { + GroupResourceProfile groupResourceProfile = + orchestratorRegistryService.getGroupResourceProfile(groupResourceProfileId); + token = groupResourceProfile.getDefaultCredentialStoreToken(); + } + if (token == null || token.isEmpty()) { + logger.error( + "You have not configured credential store token at group resource profile or compute resource preference." + + " Please provide the correct token at group resource profile or compute resource preference."); + return false; + } + + orchestrator.cancelExperiment(experimentModel, token); + String expCancelNodePath = ZKPaths.makePath( + ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), + ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + Stat stat = curatorClient.checkExists().forPath(expCancelNodePath); + if (stat != null) { + curatorClient + .setData() + .withVersion(-1) + .forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST.getBytes()); + ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING); + status.setReason("Experiment cancel request processed"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + logger.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState()); + } + return true; + } + } + + public void fetchIntermediateOutputs(String experimentId, String gatewayId, List<String> outputNames) + throws Exception { + submitIntermediateOutputsProcess(experimentId, gatewayId, outputNames); + } + + private void submitIntermediateOutputsProcess( + String experimentId, String gatewayId, List<String> outputNames) throws Exception { + + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(experimentId); + ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel); + processModel.setExperimentDataDir(processModel.getExperimentDataDir() + "/intermediates"); + + List<OutputDataObjectType> applicationOutputs = orchestratorRegistryService.getApplicationOutputs( + experimentModel.getExecutionId()); + List<OutputDataObjectType> requestedOutputs = new ArrayList<>(); + + for (OutputDataObjectType output : applicationOutputs) { + if (outputNames.contains(output.getName())) { + requestedOutputs.add(output); + } + } + processModel.setProcessOutputs(requestedOutputs); + String processId = orchestratorRegistryService.addProcess(processModel, experimentId); + processModel.setProcessId(processId); + + try { + Optional<ProcessModel> jobSubmissionProcess = experimentModel.getProcesses().stream() + .filter(p -> p.getTasks().stream().anyMatch(t -> t.getTaskType() == TaskTypes.JOB_SUBMISSION)) + .findFirst(); + if (!jobSubmissionProcess.isPresent()) { + throw new Exception(MessageFormat.format( + "Could not find job submission process for experiment {0}, unable to fetch intermediate outputs {1}", + experimentId, outputNames)); + } + String taskDag = orchestrator.createAndSaveIntermediateOutputFetchingTasks( + gatewayId, processModel, jobSubmissionProcess.get()); + processModel.setTaskDag(taskDag); + + orchestratorRegistryService.updateProcess(processModel, processModel.getProcessId()); + + String token = getCredentialToken(experimentModel, experimentModel.getUserConfigurationData()); + orchestrator.launchProcess(processModel, token); + } catch (Exception e) { + logger.error("Failed to launch process for intermediate output fetching", e); + + ProcessStatus status = new ProcessStatus(ProcessState.FAILED); + status.setReason("Intermediate output fetching process failed to launch: " + e.getMessage()); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + orchestratorRegistryService.addProcessStatus(status, processId); + + throw e; + } + } + + public boolean launchProcess(String processId, String airavataCredStoreToken, String gatewayId) throws Exception { + ProcessStatus processStatus = orchestratorRegistryService.getProcessStatus(processId); + + switch (processStatus.getState()) { + case CREATED: + case VALIDATED: + case DEQUEUING: + ProcessModel processModel = orchestratorRegistryService.getProcess(processId); + String applicationId = processModel.getApplicationInterfaceId(); + if (applicationId == null) { + logger.error(processId, "Application interface id shouldn't be null."); + throw new OrchestratorException( + "Error executing the job, application interface id shouldn't be null."); + } + ApplicationDeploymentDescription applicationDeploymentDescription = + getAppDeployment(processModel, applicationId); + if (applicationDeploymentDescription == null) { + logger.error("Could not find an application deployment for " + processModel.getComputeResourceId() + + " and application " + applicationId); + throw new OrchestratorException("Could not find an application deployment for " + + processModel.getComputeResourceId() + " and application " + applicationId); + } + processModel.setApplicationDeploymentId(applicationDeploymentDescription.getAppDeploymentId()); + processModel.setComputeResourceId( + processModel.getProcessResourceSchedule().getResourceHostId()); + orchestratorRegistryService.updateProcess(processModel, processModel.getProcessId()); + return orchestrator.launchProcess(processModel, airavataCredStoreToken); + + default: + logger.warn("Process " + processId + " is already launched. So it can not be relaunched"); + return false; + } + } + + private ApplicationDeploymentDescription getAppDeployment( + ProcessModel processModel, String applicationId) + throws Exception { + String selectedModuleId = getModuleId(applicationId); + return getAppDeploymentForModule(processModel, selectedModuleId); + } + + private ApplicationDeploymentDescription getAppDeploymentForModule( + ProcessModel processModel, String selectedModuleId) + throws Exception { + + List<ApplicationDeploymentDescription> applicationDeployements = + orchestratorRegistryService.getApplicationDeployments(selectedModuleId); + Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = + new HashMap<>(); + + for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) { + if (processModel.getComputeResourceId().equals(deploymentDescription.getComputeHostId())) { + deploymentMap.put( + orchestratorRegistryService.getComputeResource(deploymentDescription.getComputeHostId()), + deploymentDescription); + } + } + List<ComputeResourceDescription> computeHostList = + Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[] {})); + Class<? extends HostScheduler> aClass = + Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class); + HostScheduler hostScheduler = aClass.newInstance(); + ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); + return deploymentMap.get(ComputeResourceDescription); + } + + private String getModuleId(String applicationId) + throws Exception { + ApplicationInterfaceDescription applicationInterface = orchestratorRegistryService.getApplicationInterface(applicationId); + List<String> applicationModules = applicationInterface.getApplicationModules(); + if (applicationModules.size() == 0) { + throw new OrchestratorException("No modules defined for application " + applicationId); + } + String selectedModuleId = applicationModules.get(0); + return selectedModuleId; + } + + private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken, String gatewayId) + throws TException { + // FIXME - Workflow support not implemented + } + + public void createAndValidateTasks(ExperimentModel experiment, boolean recreateTaskDag) throws Exception { + if (experiment.getUserConfigurationData().isAiravataAutoSchedule()) { + List<ProcessModel> processModels = orchestratorRegistryService.getProcessList(experiment.getExperimentId()); + for (ProcessModel processModel : processModels) { + if (processModel.getTaskDag() == null || recreateTaskDag) { + orchestratorRegistryService.deleteTasks(processModel.getProcessId()); + String taskDag = orchestrator.createAndSaveTasks(experiment.getGatewayId(), processModel); + processModel.setTaskDag(taskDag); + orchestratorRegistryService.updateProcess(processModel, processModel.getProcessId()); + } + } + if (!validateProcess(experiment.getExperimentId(), processModels)) { + throw new Exception( + "Validating process fails for given experiment Id : " + experiment.getExperimentId()); + } + } + } + + public void addProcessValidationErrors(String experimentId, ErrorModel details) throws RegistryException { + orchestratorRegistryService.addErrors(OrchestratorConstants.EXPERIMENT_ERROR, details, experimentId); + } + + public String getExperimentNodePath(String experimentId) { + return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId); + } + + public boolean launchSingleAppExperimentInternal(String experimentId, String airavataCredStoreToken, String gatewayId) throws Exception { + try { + List<String> processIds = orchestratorRegistryService.getProcessIds(experimentId); + for (String processId : processIds) { + launchProcess(processId, airavataCredStoreToken, gatewayId); + } + return true; + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("Error while retrieving process IDs"); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + logger.error("expId: " + experimentId + ", Error while retrieving process IDs", e); + throw new Exception("Error while retrieving process IDs", e); + } catch (Exception e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("Error while launching processes"); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + logger.error("expId: " + experimentId + ", Error while launching processes", e); + throw e; + } + } + + public void launchQueuedExperiment(String experimentId) throws Exception { + ExperimentModel experiment = orchestratorRegistryService.getExperiment(experimentId); + if (experiment == null) { + throw new Exception("Error retrieving the Experiment by the given experimentID: " + experimentId); + } + + UserConfigurationDataModel userConfigurationData = experiment.getUserConfigurationData(); + String token = getCredentialToken(experiment, userConfigurationData); + createAndValidateTasks(experiment, true); + + // Publish experiment launched status and run launcher + ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); + status.setReason("submitted all processes"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, experiment.getGatewayId()); + logger.info("expId: {}, Launched experiment ", experimentId); + + // Launch processes + launchSingleAppExperimentInternal(experimentId, token, experiment.getGatewayId()); + } + + public void handleProcessStatusChange( + ProcessStatusChangeEvent processStatusChangeEvent, + ProcessIdentifier processIdentity) throws Exception { + ExperimentStatus status = new ExperimentStatus(); + + // Check if this is an intermediate output fetching process + ProcessModel process = orchestratorRegistryService.getProcess(processIdentity.getProcessId()); + boolean isIntermediateOutputFetchingProcess = + process.getTasks().stream().anyMatch(t -> t.getTaskType() == TaskTypes.OUTPUT_FETCHING); + if (isIntermediateOutputFetchingProcess) { + logger.info("Not updating experiment status because process is an intermediate output fetching one"); + return; + } + + switch (processStatusChangeEvent.getState()) { + case STARTED: + ExperimentStatus stat = orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELING); + status.setReason("Process started but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.EXECUTING); + status.setReason("process started"); + } + break; + case COMPLETED: + stat = orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELED); + status.setReason("Process competed but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.COMPLETED); + status.setReason("process completed"); + } + break; + case FAILED: + stat = orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELED); + status.setReason("Process failed but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.FAILED); + status.setReason("process failed"); + } + break; + case CANCELED: + status.setState(ExperimentState.CANCELED); + status.setReason("process cancelled"); + break; + case QUEUED: + status.setState(ExperimentState.SCHEDULED); + status.setReason("Process started but compute resource not avaialable"); + break; + case REQUEUED: + status.setState(ExperimentState.SCHEDULED); + status.setReason("Job submission failed, requeued to resubmit"); + registerQueueStatusForRequeue(processIdentity.getExperimentId()); + break; + case DEQUEUING: + stat = orchestratorRegistryService.getExperimentStatus(processIdentity.getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELING); + status.setReason("Process started but experiment cancelling is triggered"); + } else { + launchQueuedExperiment(processIdentity.getExperimentId()); + } + break; + default: + // ignore other status changes + return; + } + + if (status.getState() != null) { + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus( + processIdentity.getExperimentId(), status, publisher, processIdentity.getGatewayId()); + logger.info("expId : " + processIdentity.getExperimentId() + " :- Experiment status updated to " + + status.getState()); + } + } + + private void registerQueueStatusForRequeue(String experimentId) { + try { + List<QueueStatusModel> queueStatusModels = new ArrayList<>(); + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(experimentId); + UserConfigurationDataModel userConfigurationDataModel = experimentModel.getUserConfigurationData(); + if (userConfigurationDataModel != null) { + ComputationalResourceSchedulingModel computationalResourceSchedulingModel = + userConfigurationDataModel.getComputationalResourceScheduling(); + if (computationalResourceSchedulingModel != null) { + String queueName = computationalResourceSchedulingModel.getQueueName(); + String resourceId = computationalResourceSchedulingModel.getResourceHostId(); + ComputeResourceDescription comResourceDes = + orchestratorRegistryService.getComputeResource(resourceId); + QueueStatusModel queueStatusModel = new QueueStatusModel(); + queueStatusModel.setHostName(comResourceDes.getHostName()); + queueStatusModel.setQueueName(queueName); + queueStatusModel.setQueueUp(false); + queueStatusModel.setRunningJobs(0); + queueStatusModel.setQueuedJobs(0); + queueStatusModel.setTime(System.currentTimeMillis()); + queueStatusModels.add(queueStatusModel); + orchestratorRegistryService.registerQueueStatuses(queueStatusModels); + } + } + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + logger.error("Error while registering queue statuses", e); + } catch (org.apache.airavata.registry.cpi.AppCatalogException e) { + logger.error("Error while getting compute resource for queue status", e); + } + } + + public void handleLaunchExperiment(ExperimentSubmitEvent expEvent) throws Exception { + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(expEvent.getExperimentId()); + if (experimentModel.getExperimentStatus().get(0).getState() == ExperimentState.CREATED) { + launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + } + } + + /** + * Handle launch experiment from message context with deserialization and redelivery checks + */ + public void handleLaunchExperimentFromMessage(MessageContext messageContext) throws Exception { + ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent()); + ThriftUtils.createThriftFromBytes(bytes, expEvent); + + if (messageContext.isRedeliver()) { + ExperimentModel experimentModel = orchestratorRegistryService.getExperiment(expEvent.getExperimentId()); + if (experimentModel != null && experimentModel.getExperimentStatus().get(0).getState() == ExperimentState.CREATED) { + handleLaunchExperiment(expEvent); + } + } else { + handleLaunchExperiment(expEvent); + } + } + + public void handleCancelExperiment(ExperimentSubmitEvent expEvent) throws Exception { + terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId()); + } + + public void handleIntermediateOutputsEvent(ExperimentIntermediateOutputsEvent event) throws Exception { + fetchIntermediateOutputs(event.getExperimentId(), event.getGatewayId(), event.getOutputNames()); + } + + public boolean launchExperimentWithErrorHandling(String experimentId, String gatewayId, java.util.concurrent.ExecutorService executorService) throws TException { + try { + boolean result = launchExperiment(experimentId, gatewayId); + if (result) { + ExperimentModel experiment = orchestratorRegistryService.getExperiment(experimentId); + String token = getCredentialToken(experiment, experiment.getUserConfigurationData()); + ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); + status.setReason("submitted all processes"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + logger.info("expId: {}, Launched experiment ", experimentId); + + // Execute the single app experiment runner in the provided thread pool + if (executorService != null) { + Runnable runner = () -> { + try { + launchSingleAppExperimentInternal(experimentId, token, gatewayId); + } catch (Exception e) { + logger.error("expId: " + experimentId + ", Error while launching single app experiment", e); + } + }; + executorService.execute(MDCUtil.wrapWithMDC(runner)); + } + } + return result; + } catch (LaunchValidationException launchValidationException) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("Validation failed: " + launchValidationException.getErrorMessage()); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + throw new TException( + "Experiment '" + experimentId + "' launch failed. Experiment failed to validate: " + + launchValidationException.getErrorMessage(), + launchValidationException); + } catch (org.apache.airavata.registry.cpi.RegistryException e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("Registry error: " + e.getMessage()); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + throw new TException("Experiment '" + experimentId + "' launch failed.", e); + } catch (org.apache.airavata.registry.cpi.AppCatalogException e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("App catalog error: " + e.getMessage()); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + throw new TException("Experiment '" + experimentId + "' launch failed.", e); + } catch (Exception e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); + status.setReason("Unexpected error occurred: " + e.getMessage()); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updateAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); + throw new TException("Experiment '" + experimentId + "' launch failed.", e); + } + } +} +
