Repository: airavata Updated Branches: refs/heads/develop f66043474 -> 6e5d1c6ee
User user compute resource preference if user provided Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6e5d1c6e Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6e5d1c6e Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6e5d1c6e Branch: refs/heads/develop Commit: 6e5d1c6eee8cc6772ca4381da2ec5d2a8ac58a7d Parents: f660434 Author: Shameera Rathnayaka <[email protected]> Authored: Fri Nov 11 19:40:12 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Fri Nov 11 19:40:12 2016 -0500 ---------------------------------------------------------------------- .../core/utils/OrchestratorUtils.java | 231 +++++++++++++------ .../cpi/impl/SimpleOrchestratorImpl.java | 146 +++++++----- 2 files changed, 250 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java index 83c9273..61f7188 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java @@ -23,6 +23,7 @@ package org.apache.airavata.orchestrator.core.utils; import java.io.IOException; import java.util.*; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; @@ -31,11 +32,13 @@ import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterfa import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; +import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; import org.apache.airavata.model.data.movement.DataMovementInterface; import org.apache.airavata.model.data.movement.DataMovementProtocol; import org.apache.airavata.model.data.movement.SCPDataMovement; import org.apache.airavata.model.data.movement.SecurityProtocol; import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.orchestrator.core.OrchestratorConfiguration; import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; @@ -50,69 +53,99 @@ import org.slf4j.LoggerFactory; public class OrchestratorUtils { private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class); - public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException { + public static OrchestratorConfiguration loadOrchestratorConfiguration() + throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException { + OrchestratorConfiguration orchestratorConfiguration = new OrchestratorConfiguration(); - orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL))); - orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE))); - orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER))); - orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE))); - orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION))); + orchestratorConfiguration.setSubmitterInterval( + Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.SUBMIT_INTERVAL))); + orchestratorConfiguration.setThreadPoolSize( + Integer.parseInt(ServerSettings.getSetting(OrchestratorConstants.THREAD_POOL_SIZE))); + orchestratorConfiguration.setStartSubmitter( + Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.START_SUBMITTER))); + orchestratorConfiguration.setEmbeddedMode( + Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.EMBEDDED_MODE))); + orchestratorConfiguration.setEnableValidation( + Boolean.valueOf(ServerSettings.getSetting(OrchestratorConstants.ENABLE_VALIDATION))); if (orchestratorConfiguration.isEnableValidation()) { - orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(","))); + orchestratorConfiguration.setValidatorClasses( + Arrays.asList(ServerSettings.getSetting(OrchestratorConstants.JOB_VALIDATOR).split(","))); } return orchestratorConfiguration; } - public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException { + public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, + ProcessModel model, + String gatewayId) throws RegistryException { try { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); String resourceHostId = model.getComputeResourceId(); - ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId - , resourceHostId); - return preference.getPreferredJobSubmissionProtocol(); + return getComputeResourcePreference(context, gatewayId, resourceHostId).getPreferredJobSubmissionProtocol(); } catch (AppCatalogException e) { logger.error("Error occurred while initializing app catalog", e); throw new RegistryException("Error occurred while initializing app catalog", e); } } - public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException { + public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, + String gatewayId, + String resourceHostId) + throws AppCatalogException, RegistryException { + + GwyResourceProfile gatewayProfile = getGatewayProfile(context); + return gatewayProfile.getComputeResourcePreference(gatewayId + , resourceHostId); + } + + public static GwyResourceProfile getGatewayProfile(OrchestratorContext context) + throws AppCatalogException, RegistryException { + return context.getRegistry().getAppCatalog().getGatewayProfile(); + } + + public static UsrResourceProfile getUserResourceProfile(OrchestratorContext context) + throws RegistryException, AppCatalogException { + return context.getRegistry().getAppCatalog().getUserResourceProfile(); + } + + public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) + throws RegistryException { try { ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface(); - ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId()); + ApplicationInterfaceDescription appInterface = + applicationInterface.getApplicationInterface(model.getApplicationInterfaceId()); return appInterface.getApplicationName(); } catch (AppCatalogException e) { throw new RegistryException("Error while retrieving application interface", e); } } - public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException { + public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, + ProcessModel model, + String gatewayId) throws RegistryException { try { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); String resourceHostId = model.getComputeResourceId(); - ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId - , resourceHostId); - return preference.getPreferredDataMovementProtocol(); + return getComputeResourcePreference(context, gatewayId, resourceHostId).getPreferredDataMovementProtocol(); } catch (AppCatalogException e) { logger.error("Error occurred while initializing app catalog", e); throw new RegistryException("Error occurred while initializing app catalog", e); } } - public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException { try { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); - String resourceHostId = processModel.getComputeResourceId(); - return gatewayProfile.getComputeResourcePreference(gatewayId, resourceHostId); + return getComputeResourcePreference(context, gatewayId, processModel.getComputeResourceId()); } catch (AppCatalogException e) { logger.error("Error occurred while initializing app catalog", e); throw new RegistryException("Error occurred while initializing app catalog", e); } } - public static StoragePreference getStoragePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static StoragePreference getStoragePreference(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException { try { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + GwyResourceProfile gatewayProfile = getGatewayProfile(context); String resourceHostId = processModel.getComputeResourceId(); return gatewayProfile.getStoragePreference(gatewayId, resourceHostId); } catch (AppCatalogException e) { @@ -121,46 +154,103 @@ public class OrchestratorUtils { } } - public static String getLoginUserName(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static String getLoginUserName(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException, AiravataException { try { - String loginUserName = null; - String overrideLoginUserName = processModel.getProcessResourceSchedule().getOverrideLoginUserName(); - if (overrideLoginUserName != null && !overrideLoginUserName.equals("")) { - loginUserName = overrideLoginUserName; - } else { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); - loginUserName = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getLoginUserName(); + ComputeResourcePreference computeResourcePreference = getComputeResourcePreference(context, gatewayId, + processModel.getComputeResourceId()); + ComputationalResourceSchedulingModel processResourceSchedule = processModel.getProcessResourceSchedule(); + if (processModel.isUseUserCRPref()) { + UsrResourceProfile userResourceProfile = getUserResourceProfile(context); + UserComputeResourcePreference userComputeResourcePreference = userResourceProfile + .getUserComputeResourcePreference(processModel.getUserName(), gatewayId, + processModel.getComputeResourceId()); + if (isValid(userComputeResourcePreference.getLoginUserName())) { + return userComputeResourcePreference.getLoginUserName(); + } else if (isValid(processResourceSchedule.getOverrideLoginUserName())) { + logger.warn("User computer resource preference doesn't have valid user login name, using computer " + + "resource scheduling login name " + processResourceSchedule.getOverrideLoginUserName()); + return processResourceSchedule.getOverrideLoginUserName(); + } else if (isValid(computeResourcePreference.getLoginUserName())) { + logger.warn("Either User computer resource preference or computer resource scheduling " + + "doesn't have valid user login name, using gateway computer resource preference login name " + + computeResourcePreference.getLoginUserName()); + return computeResourcePreference.getLoginUserName(); + }else { + throw new AiravataException("Login name is not found"); + } + }else { + if (isValid(processResourceSchedule.getOverrideLoginUserName())) { + return processResourceSchedule.getOverrideLoginUserName(); + } else if (isValid(computeResourcePreference.getLoginUserName())) { + logger.warn("Process compute resource scheduling doesn't have valid user login name, " + + "using gateway computer resource preference login name " + + computeResourcePreference.getLoginUserName()); + return computeResourcePreference.getLoginUserName(); + }else { + throw new AiravataException("Login name is not found"); + } } - return loginUserName; } catch (AppCatalogException e) { logger.error("Error occurred while initializing app catalog to fetch login username", e); throw new RegistryException("Error occurred while initializing app catalog to fetch login username", e); } } - public static String getScratchLocation(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static String getScratchLocation(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException, AiravataException { try { - String scratchLocation = null; - String overrideScratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation(); - if (overrideScratchLocation != null && !overrideScratchLocation.equals("")) { - scratchLocation = overrideScratchLocation; - } else { - GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); - scratchLocation = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getScratchLocation(); + ComputeResourcePreference computeResourcePreference = getComputeResourcePreference(context, gatewayId, + processModel.getComputeResourceId()); + ComputationalResourceSchedulingModel processResourceSchedule = processModel.getProcessResourceSchedule(); + if (processModel.isUseUserCRPref()) { + UsrResourceProfile userResourceProfile = getUserResourceProfile(context); + UserComputeResourcePreference userComputeResourcePreference = userResourceProfile + .getUserComputeResourcePreference(processModel.getUserName(), gatewayId, + processModel.getComputeResourceId()); + if (isValid(userComputeResourcePreference.getScratchLocation())) { + return userComputeResourcePreference.getScratchLocation(); + } else if (isValid(processResourceSchedule.getOverrideScratchLocation())) { + logger.warn("User computer resource preference doesn't have valid scratch location, using computer " + + "resource scheduling scratch location " + processResourceSchedule.getOverrideScratchLocation()); + return processResourceSchedule.getOverrideScratchLocation(); + } else if (isValid(computeResourcePreference.getScratchLocation())) { + logger.warn("Either User computer resource preference or computer resource scheduling doesn't have " + + "valid scratch location, using gateway computer resource preference scratch location" + + computeResourcePreference.getScratchLocation()); + return computeResourcePreference.getScratchLocation(); + }else { + throw new AiravataException("Scratch location is not found"); + } + }else { + if (isValid(processResourceSchedule.getOverrideScratchLocation())) { + return processResourceSchedule.getOverrideScratchLocation(); + } else if (isValid(computeResourcePreference.getScratchLocation())) { + logger.warn("Process compute resource scheduling doesn't have valid scratch location, " + + "using gateway computer resource preference scratch location" + + computeResourcePreference.getScratchLocation()); + return computeResourcePreference.getScratchLocation(); + }else { + throw new AiravataException("Scratch location is not found"); + } } - return scratchLocation; } catch (AppCatalogException e) { logger.error("Error occurred while initializing app catalog to fetch scratch location", e); throw new RegistryException("Error occurred while initializing app catalog to fetch scratch location", e); } } - public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException { try { String resourceHostId = processModel.getComputeResourceId(); ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId); JobSubmissionProtocol preferredJobSubmissionProtocol = resourcePreference.getPreferredJobSubmissionProtocol(); - ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); + ComputeResourceDescription resourceDescription = + context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces(); Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>(); List<JobSubmissionInterface> interfaces = new ArrayList<>(); @@ -178,21 +268,14 @@ public class OrchestratorUtils { } } }else { - Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); + Collections.sort(jobSubmissionInterfaces, + (jobSubmissionInterface, jobSubmissionInterface2) -> + jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder()); } } interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol); - Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); + Collections.sort(interfaces, (jobSubmissionInterface, jobSubmissionInterface2) -> + jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder()); } else { throw new RegistryException("Compute resource should have at least one job submission interface defined..."); } @@ -202,12 +285,15 @@ public class OrchestratorUtils { } } - public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException { try { String resourceHostId = processModel.getComputeResourceId(); ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId); DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol(); - ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); + ComputeResourceDescription resourceDescription = + context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces(); if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) { for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){ @@ -226,7 +312,9 @@ public class OrchestratorUtils { return null; } - public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{ + public static int getDataMovementPort(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException{ try { DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId); DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId); @@ -243,7 +331,9 @@ public class OrchestratorUtils { } - public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{ + public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, + ProcessModel processModel, + String gatewayId) throws RegistryException{ try { JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId); JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(context, processModel, gatewayId); @@ -274,7 +364,8 @@ public class OrchestratorUtils { return null; } - public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, + String submissionId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); return appCatalog.getComputeResource().getLocalJobSubmission(submissionId); @@ -285,7 +376,8 @@ public class OrchestratorUtils { } } - public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, + String submissionId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId); @@ -296,7 +388,8 @@ public class OrchestratorUtils { } } - public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, + String submissionId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); return appCatalog.getComputeResource().getSSHJobSubmission(submissionId); @@ -307,7 +400,8 @@ public class OrchestratorUtils { } } - public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, + String submissionId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); return appCatalog.getComputeResource().getCloudJobSubmission(submissionId); @@ -318,7 +412,8 @@ public class OrchestratorUtils { } } - public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException { + public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, + String dataMoveId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId); @@ -328,4 +423,8 @@ public class OrchestratorUtils { throw new RegistryException(errorMsg, e); } } + + private static boolean isValid(String str) { + return (str != null && str.trim().isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/6e5d1c6e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index b97e79a..19a3521 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.orchestrator.cpi.impl; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.task.TaskException; @@ -93,15 +94,18 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } - public ValidationResults validateExperiment(ExperimentModel experiment) throws OrchestratorException,LaunchValidationException { - org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults(); + public ValidationResults validateExperiment(ExperimentModel experiment) + throws OrchestratorException,LaunchValidationException { + org.apache.airavata.model.error.ValidationResults validationResults = + new org.apache.airavata.model.error.ValidationResults(); validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed. String errorMsg = "Validation Errors : "; if (this.orchestratorConfiguration.isEnableValidation()) { List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses(); for (String validator : validatorClasses) { try { - Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class); + Class<? extends JobMetadataValidator> vClass = + Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class); JobMetadataValidator jobMetadataValidator = vClass.newInstance(); validationResults = jobMetadataValidator.validate(experiment, null); if (validationResults.isValidationState()) { @@ -116,14 +120,15 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } } - logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); + logger.error("Validation of " + validator + " for experiment Id " + + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); validationResults.setValidationState(false); try { ErrorModel details = new ErrorModel(); details.setActualErrorMessage(errorMsg); details.setCreationTime(Calendar.getInstance().getTimeInMillis()); - orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, details, - experiment.getExperimentId()); + orchestratorContext.getRegistry().getExperimentCatalog() + .add(ExpCatChildDataType.EXPERIMENT_ERROR, details, experiment.getExperimentId()); } catch (RegistryException e) { logger.error("Error while saving error details to registry", e); } @@ -147,12 +152,15 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ //atleast one validation has failed, so we throw an exception LaunchValidationException launchValidationException = new LaunchValidationException(); launchValidationException.setValidationResult(validationResults); - launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg); + launchValidationException.setErrorMessage("Validation failed refer the validationResults list for " + + "detail error. Validation errors : " + errorMsg); throw launchValidationException; } } - public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) throws OrchestratorException,LaunchValidationException { + public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) + throws OrchestratorException, LaunchValidationException { + org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults(); validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed. String errorMsg = "Validation Errors : "; @@ -167,46 +175,42 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ logger.info("Validation of " + validator + " is SUCCESSFUL"); } else { List<ValidatorResult> validationResultList = validationResults.getValidationResultList(); - for (ValidatorResult result : validationResultList){ - if (!result.isResult()){ + for (ValidatorResult result : validationResultList) { + if (!result.isResult()) { String validationError = result.getErrorDetails(); - if (validationError != null){ + if (validationError != null) { errorMsg += validationError + " "; } } } - logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); + logger.error("Validation of " + validator + " for experiment Id " + + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); validationResults.setValidationState(false); try { ErrorModel details = new ErrorModel(); details.setActualErrorMessage(errorMsg); details.setCreationTime(Calendar.getInstance().getTimeInMillis()); - orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, details, - processModel.getProcessId()); + orchestratorContext.getRegistry().getExperimentCatalog() + .add(ExpCatChildDataType.PROCESS_ERROR, details, processModel.getProcessId()); } catch (RegistryException e) { logger.error("Error while saving error details to registry", e); } break; } - } catch (ClassNotFoundException e) { - logger.error("Error loading the validation class: ", validator, e); - validationResults.setValidationState(false); - } catch (InstantiationException e) { - logger.error("Error loading the validation class: ", validator, e); - validationResults.setValidationState(false); - } catch (IllegalAccessException e) { + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { logger.error("Error loading the validation class: ", validator, e); validationResults.setValidationState(false); } } } - if(validationResults.isValidationState()){ + if (validationResults.isValidationState()) { return validationResults; - }else { + } else { //atleast one validation has failed, so we throw an exception LaunchValidationException launchValidationException = new LaunchValidationException(); launchValidationException.setValidationResult(validationResults); - launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg); + launchValidationException.setErrorMessage("Validation failed refer the validationResults " + + "list for detail error. Validation errors : " + errorMsg); throw launchValidationException; } } @@ -253,7 +257,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ try { Registry registry = orchestratorContext.getRegistry(); ExperimentModel experimentModel = (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); - List<Object> processList = registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId); + List<Object> processList = registry.getExperimentCatalog() + .get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId); if (processList != null && !processList.isEmpty()) { for (Object processObject : processList) { ProcessModel processModel = (ProcessModel)processObject; @@ -283,8 +288,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ throw new OrchestratorException("Compute Resource Id cannot be null at this point"); } ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId); - JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); - ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + JobSubmissionInterface preferredJobSubmissionInterface = + OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); + ComputeResourcePreference resourcePreference = + OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); List<String> taskIdList = new ArrayList<>(); if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) { @@ -303,19 +310,22 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ // need to create more job submissions int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime)); for (int i = 1; i <= numOfMaxWallTimeJobs; i++) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime)); + taskIdList.addAll( + createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, maxRunTime)); } int leftWallTime = userGivenWallTime % maxRunTime; if (leftWallTime != 0) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime)); + taskIdList.addAll( + createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, leftWallTime)); } } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); + taskIdList.addAll( + createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime)); } } } } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime)); } taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId)); } @@ -342,7 +352,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ private List<String> createAndSaveEnvSetupTask(String gatewayId, ProcessModel processModel, ExperimentCatalog experimentCatalog) - throws RegistryException, TException { + throws RegistryException, TException, AiravataException { List<String> envTaskIds = new ArrayList<>(); TaskModel envSetupTask = new TaskModel(); envSetupTask.setTaskType(TaskTypes.ENV_SETUP); @@ -363,7 +373,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return envTaskIds; } - public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException { + public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) + throws RegistryException, AiravataException { + List<String> dataStagingTaskIds = new ArrayList<>(); List<InputDataObjectType> processInputs = processModel.getProcessInputs(); @@ -380,8 +392,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ case URI_COLLECTION: try { TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId); - String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask, - processModel.getProcessId()); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog() + .add(ExpCatChildDataType.TASK, inputDataStagingTask, processModel.getProcessId()); inputDataStagingTask.setTaskId(taskId); dataStagingTaskIds.add(inputDataStagingTask.getTaskId()); } catch (TException | AppCatalogException | TaskException e) { @@ -397,7 +409,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return dataStagingTaskIds; } - public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException { + public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) + throws RegistryException, AiravataException { + List<String> dataStagingTaskIds = new ArrayList<>(); List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs(); String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel); @@ -405,14 +419,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ for (OutputDataObjectType processOutput : processOutputs) { DataType type = processOutput.getType(); switch (type) { - case STDOUT : - if(null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()){ + case STDOUT: + if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) { processOutput.setValue(appName + ".stdout"); } createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput); break; case STDERR: - if(null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()){ + if (null == processOutput.getValue() || processOutput.getValue().trim().isEmpty()) { processOutput.setValue(appName + ".stderr"); } createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput); @@ -439,29 +453,35 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ private boolean isArchive(ProcessModel processModel, String gatewayId) throws AppCatalogException { AppCatalog appCatalog = RegistryFactory.getAppCatalog(); - ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId()); + ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface() + .getApplicationInterface(processModel.getApplicationInterfaceId()); return appInterface.isArchiveWorkingDirectory(); } - private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException { + private void createArchiveDataStatgingTask(ProcessModel processModel, + String gatewayId, + List<String> dataStagingTaskIds) throws RegistryException, AiravataException { TaskModel archiveTask = null; try { archiveTask = getOutputDataStagingTask(processModel, null, gatewayId); } catch (TException e) { throw new RegistryException("Error! DataStaging sub task serialization failed"); } - String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask, - processModel.getProcessId()); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog() + .add(ExpCatChildDataType.TASK, archiveTask, processModel.getProcessId()); archiveTask.setTaskId(taskId); dataStagingTaskIds.add(archiveTask.getTaskId()); } - private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException { + private void createOutputDataSatagingTasks(ProcessModel processModel, + String gatewayId, + List<String> dataStagingTaskIds, + OutputDataObjectType processOutput) throws RegistryException, AiravataException { try { TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId); - String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask, - processModel.getProcessId()); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog() + .add(ExpCatChildDataType.TASK, outputDataStagingTask, processModel.getProcessId()); outputDataStagingTask.setTaskId(taskId); dataStagingTaskIds.add(outputDataStagingTask.getTaskId()); } catch (TException e) { @@ -469,7 +489,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } - private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime) + private List<String> createAndSaveSubmissionTasks(String gatewayId, + JobSubmissionInterface jobSubmissionInterface, + ProcessModel processModel, + int wallTime) throws TException, RegistryException, OrchestratorException { JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); @@ -539,7 +562,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ }); } - private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException { + private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException, AiravataException { // create new task model for this task TaskModel taskModel = new TaskModel(); taskModel.setParentProcessId(processModel.getProcessId()); @@ -551,19 +574,20 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskModel.setTaskType(TaskTypes.DATA_STAGING); // create data staging sub task model DataStagingTaskModel submodel = new DataStagingTaskModel(); - ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); - ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId()); - String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId(); - remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; + ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog() + .getComputeResource().getComputeResource(processModel.getComputeResourceId()); + String workingDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + + File.separator + processModel.getProcessId() + File.separator; URI destination = null; try { - DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId); + DataMovementProtocol dataMovementProtocol = + OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId); String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId); destination = new URI(dataMovementProtocol.name(), loginUserName, computeResource.getHostName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), - remoteOutputDir , null, null); + workingDir , null, null); } catch (URISyntaxException e) { throw new TaskException("Error while constructing destination file URI"); } @@ -575,7 +599,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return taskModel; } - private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException { + private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException, AiravataException { try { // create new task model for this task @@ -587,11 +611,11 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); taskModel.setTaskStatuses(Arrays.asList(taskStatus)); taskModel.setTaskType(TaskTypes.DATA_STAGING); - ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); - ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId()); + ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog() + .getComputeResource().getComputeResource(processModel.getComputeResourceId()); - String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId(); - remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; + String workingDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + + File.separator + processModel.getProcessId() + File.separator; DataStagingTaskModel submodel = new DataStagingTaskModel(); DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId); URI source = null; @@ -604,7 +628,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ loginUserName, computeResource.getHostName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), - remoteOutputDir + processOutput.getValue(), null, null); + workingDir + processOutput.getValue(), null, null); } else { // archive submodel.setType(DataStageType.ARCHIVE_OUTPUT); @@ -612,7 +636,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ loginUserName, computeResource.getHostName(), OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), - remoteOutputDir, null, null); + workingDir, null, null); } } catch (URISyntaxException e) { throw new TaskException("Error while constructing source file URI");
