http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java ---------------------------------------------------------------------- diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java new file mode 100644 index 0000000..da357a4 --- /dev/null +++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/OrchestratorConfiguration.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.core; + +import java.net.URL; +import java.util.List; + +/** + * This keeps configuration of orchestrator, mostly this keep static + * configuration, this can be accessed through orchestratorContext object + */ +public class OrchestratorConfiguration { + + private String newJobSubmitterClass; + + private String hangedJobSubmitterClass; + + private int submitterInterval = 1000; + + private int threadPoolSize = 10; + + private boolean startSubmitter = false; + + private URL brokerURL; + + private boolean embeddedMode; + + private List<String> validatorClasses; + + private boolean enableValidation; + + + public List<String> getValidatorClasses() { + return validatorClasses; + } + + public void setValidatorClasses(List<String> validatorClassesIn) { + this.validatorClasses = validatorClassesIn; + } + + public boolean isEmbeddedMode() { + return embeddedMode; + } + + public void setEmbeddedMode(boolean embeddedModeIn) { + this.embeddedMode = embeddedModeIn; + } + + public URL getBrokerURL() { + return brokerURL; + } + + public void setBrokerURL(URL brokerURLIn) { + this.brokerURL = brokerURLIn; + } + + public String getNewJobSubmitterClass() { + return newJobSubmitterClass; + } + + public int getSubmitterInterval() { + return submitterInterval; + } + + public int getThreadPoolSize() { + return threadPoolSize; + } + + public void setNewJobSubmitterClass(String newJobSubmitterClassIn) { + this.newJobSubmitterClass = newJobSubmitterClassIn; + } + + public void setSubmitterInterval(int submitterIntervalIn) { + this.submitterInterval = submitterIntervalIn; + } + + public void setThreadPoolSize(int threadPoolSizeIn) { + this.threadPoolSize = threadPoolSizeIn; + } + + public boolean isStartSubmitter() { + return startSubmitter; + } + + public void setStartSubmitter(boolean startSubmitterIn) { + this.startSubmitter = startSubmitterIn; + } + + public String getHangedJobSubmitterClass() { + return hangedJobSubmitterClass; + } + + public void setHangedJobSubmitterClass(String hangedJobSubmitterClassIn) { + this.hangedJobSubmitterClass = hangedJobSubmitterClassIn; + } + + public boolean isEnableValidation() { + return enableValidation; + } + + public void setEnableValidation(boolean enableValidationIn) { + this.enableValidation = enableValidationIn; + } +}
http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java ---------------------------------------------------------------------- diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java new file mode 100644 index 0000000..6a0f04b --- /dev/null +++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.core.utils; + +/** + * This class contains all the constants in orchestrator-core + * + */ +/*public class OrchestratorConstants { + public static final String AIRAVATA_PROPERTIES = "airavata-server.properties"; + public static final int hotUpdateInterval=1000; + public static final String SUBMIT_INTERVAL = "submitter.interval"; + public static final String THREAD_POOL_SIZE = "threadpool.size"; + public static final String START_SUBMITTER = "start.submitter"; + public static final String EMBEDDED_MODE = "embedded.mode"; + public static final String ENABLE_VALIDATION = "enable.validation"; + public static final String JOB_VALIDATOR = "job.validators"; +}*/ + + +/** + * This enum contains all the constants in orchestrator-core + enum is the way about dealing with constants as its very powerful. + Hence, a design change has been made to change the class to enum. + * + */ +public enum OrchestratorConstants { + AIRAVATA_PROPERTIES("airavata-server.properties"), + hotUpdateInterval(1000), + SUBMIT_INTERVAL("submitter.interval"), + THREAD_POOL_SIZE("threadpool.size"), + START_SUBMITTER("start.submitter"), + EMBEDDED_MODE("embedded.mode"), + ENABLE_VALIDATION("enable.validation"), + JOB_VALIDATOR("job.validators"); + + + private String stringConstant; + private int integerConstant; + + OrchestratorConstants(String stringConstantIn) + { + stringConstant = stringConstantIn; + } + OrchestratorConstants(int integerConstantIn) + { + integerConstant = integerConstantIn; + } + + public String getOrchestratorStringConstant() + { + return stringConstant; + } + public int getOrchestratorIntegerConstant() + { + return integerConstant; + } + +} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java new file mode 100644 index 0000000..3ee40e0 --- /dev/null +++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.core.utils; + +import java.io.IOException; +import java.util.*; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; +import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; +import org.apache.airavata.model.data.movement.DataMovementInterface; +import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.data.movement.SCPDataMovement; +import org.apache.airavata.model.data.movement.SecurityProtocol; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.orchestrator.core.OrchestratorConfiguration; +import org.apache.airavata.orchestrator.core.context.OrchestratorContext; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.ApplicationInterface; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This contains orchestrator specific utilities + */ +public class OrchestratorUtils { + + + private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class); + + public static String OrchestratorStringConstant(OrchestratorConstants constant) + { + return constant.getOrchestratorStringConstant(); + } + + public static int OrchestratorIntegerConstant(OrchestratorConstants constant) + { + return constant.getOrchestratorIntegerConstant(); + } + + public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException, NumberFormatException, ApplicationSettingsException { + OrchestratorConfiguration orchestratorConfiguration = new OrchestratorConfiguration(); + orchestratorConfiguration.setSubmitterInterval(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.SUBMIT_INTERVAL)))); + orchestratorConfiguration.setThreadPoolSize(Integer.parseInt((String) ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.THREAD_POOL_SIZE)))); + orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.START_SUBMITTER)))); + orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.EMBEDDED_MODE)))); + orchestratorConfiguration.setEnableValidation(Boolean.valueOf(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.ENABLE_VALIDATION)))); + if (orchestratorConfiguration.isEnableValidation()) { + orchestratorConfiguration.setValidatorClasses(Arrays.asList(ServerSettings.getSetting(OrchestratorStringConstant(OrchestratorConstants.JOB_VALIDATOR)).split(","))); + } + return orchestratorConfiguration; + } + + public static JobSubmissionProtocol getPreferredJobSubmissionProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException { + try { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + String resourceHostId = model.getComputeResourceId(); + ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId + , resourceHostId); + return preference.getPreferredJobSubmissionProtocol(); + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog", e); + throw new RegistryException("Error occurred while initializing app catalog", e); + } + } + + public static String getApplicationInterfaceName(OrchestratorContext context, ProcessModel model) throws RegistryException { + try { + ApplicationInterface applicationInterface = context.getRegistry().getAppCatalog().getApplicationInterface(); + ApplicationInterfaceDescription appInterface = applicationInterface.getApplicationInterface(model.getApplicationInterfaceId()); + return appInterface.getApplicationName(); + } catch (AppCatalogException e) { + throw new RegistryException("Error while retrieving application interface", e); + } + } + + public static DataMovementProtocol getPreferredDataMovementProtocol(OrchestratorContext context, ProcessModel model, String gatewayId) throws RegistryException { + try { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + String resourceHostId = model.getComputeResourceId(); + ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(gatewayId + , resourceHostId); + return preference.getPreferredDataMovementProtocol(); + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog", e); + throw new RegistryException("Error occurred while initializing app catalog", e); + } + } + + public static ComputeResourcePreference getComputeResourcePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + String resourceHostId = processModel.getComputeResourceId(); + return gatewayProfile.getComputeResourcePreference(gatewayId, resourceHostId); + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog", e); + throw new RegistryException("Error occurred while initializing app catalog", e); + } + } + + public static StoragePreference getStoragePreference(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + String resourceHostId = processModel.getComputeResourceId(); + return gatewayProfile.getStoragePreference(gatewayId, resourceHostId); + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog", e); + throw new RegistryException("Error occurred while initializing app catalog", e); + } + } + + public static String getLoginUserName(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + String loginUserName = null; + String overrideLoginUserName = processModel.getResourceSchedule().getOverrideLoginUserName(); + if (overrideLoginUserName != null && !overrideLoginUserName.equals("")) { + loginUserName = overrideLoginUserName; + } else { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + loginUserName = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getLoginUserName(); + } + return loginUserName; + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog to fetch login username", e); + throw new RegistryException("Error occurred while initializing app catalog to fetch login username", e); + } + } + + public static String getScratchLocation(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + String scratchLocation = null; + String overrideScratchLocation = processModel.getResourceSchedule().getOverrideScratchLocation(); + if (overrideScratchLocation != null && !overrideScratchLocation.equals("")) { + scratchLocation = overrideScratchLocation; + } else { + GwyResourceProfile gatewayProfile = context.getRegistry().getAppCatalog().getGatewayProfile(); + scratchLocation = gatewayProfile.getComputeResourcePreference(gatewayId, processModel.getComputeResourceId()).getScratchLocation(); + } + return scratchLocation; + } catch (AppCatalogException e) { + logger.error("Error occurred while initializing app catalog to fetch scratch location", e); + throw new RegistryException("Error occurred while initializing app catalog to fetch scratch location", e); + } + } + + public static JobSubmissionInterface getPreferredJobSubmissionInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + String resourceHostId = processModel.getComputeResourceId(); + ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId); + JobSubmissionProtocol preferredJobSubmissionProtocol = resourcePreference.getPreferredJobSubmissionProtocol(); + ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); + List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces(); + Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>(); + List<JobSubmissionInterface> interfaces = new ArrayList<>(); + if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) { + for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){ + + if (preferredJobSubmissionProtocol != null){ + if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){ + if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){ + List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol()); + interfaceList.add(submissionInterface); + }else { + interfaces.add(submissionInterface); + orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces); + } + } + }else { + Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); + } + }); + } + } + interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol); + Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); + } + }); + } else { + throw new RegistryException("Compute resource should have at least one job submission interface defined..."); + } + return interfaces.get(0); + } catch (AppCatalogException e) { + throw new RegistryException("Error occurred while retrieving data from app catalog", e); + } + } + + public static DataMovementInterface getPrefferredDataMovementInterface(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException { + try { + String resourceHostId = processModel.getComputeResourceId(); + ComputeResourcePreference resourcePreference = getComputeResourcePreference(context, processModel, gatewayId); + DataMovementProtocol preferredDataMovementProtocol = resourcePreference.getPreferredDataMovementProtocol(); + ComputeResourceDescription resourceDescription = context.getRegistry().getAppCatalog().getComputeResource().getComputeResource(resourceHostId); + List<DataMovementInterface> dataMovementInterfaces = resourceDescription.getDataMovementInterfaces(); + if (dataMovementInterfaces != null && !dataMovementInterfaces.isEmpty()) { + for (DataMovementInterface dataMovementInterface : dataMovementInterfaces){ + if (preferredDataMovementProtocol != null){ + if (preferredDataMovementProtocol.toString().equals(dataMovementInterface.getDataMovementProtocol().toString())){ + return dataMovementInterface; + } + } + } + } else { + throw new RegistryException("Compute resource should have at least one data movement interface defined..."); + } + } catch (AppCatalogException e) { + throw new RegistryException("Error occurred while retrieving data from app catalog", e); + } + return null; + } + + public static int getDataMovementPort(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{ + try { + DataMovementProtocol protocol = getPreferredDataMovementProtocol(context, processModel, gatewayId); + DataMovementInterface dataMovementInterface = getPrefferredDataMovementInterface(context, processModel, gatewayId); + if (protocol == DataMovementProtocol.SCP ) { + SCPDataMovement scpDataMovement = getSCPDataMovement(context, dataMovementInterface.getDataMovementInterfaceId()); + if (scpDataMovement != null) { + return scpDataMovement.getSshPort(); + } + } + } catch (RegistryException e) { + logger.error("Error occurred while retrieving security protocol", e); + } + return 0; + } + + + public static SecurityProtocol getSecurityProtocol(OrchestratorContext context, ProcessModel processModel, String gatewayId) throws RegistryException{ + try { + JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(context, processModel, gatewayId); + JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(context, processModel, gatewayId); + if (submissionProtocol == JobSubmissionProtocol.SSH ) { + SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (sshJobSubmission != null) { + return sshJobSubmission.getSecurityProtocol(); + } + } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) { + LOCALSubmission localJobSubmission = getLocalJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (localJobSubmission != null) { + return localJobSubmission.getSecurityProtocol(); + } + } else if (submissionProtocol == JobSubmissionProtocol.SSH_FORK){ + SSHJobSubmission sshJobSubmission = getSSHJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (sshJobSubmission != null) { + return sshJobSubmission.getSecurityProtocol(); + } + } + } catch (RegistryException e) { + logger.error("Error occurred while retrieving security protocol", e); + } + return null; + } + + public static LOCALSubmission getLocalJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + try { + AppCatalog appCatalog = context.getRegistry().getAppCatalog(); + return appCatalog.getComputeResource().getLocalJobSubmission(submissionId); + } catch (Exception e) { + String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId; + logger.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } + + public static UnicoreJobSubmission getUnicoreJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + try { + AppCatalog appCatalog = context.getRegistry().getAppCatalog(); + return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId); + } catch (Exception e) { + String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId; + logger.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } + + public static SSHJobSubmission getSSHJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + try { + AppCatalog appCatalog = context.getRegistry().getAppCatalog(); + return appCatalog.getComputeResource().getSSHJobSubmission(submissionId); + } catch (Exception e) { + String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId; + logger.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } + + public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException { + try { + AppCatalog appCatalog = context.getRegistry().getAppCatalog(); + return appCatalog.getComputeResource().getSCPDataMovement(dataMoveId); + } catch (Exception e) { + String errorMsg = "Error while retrieving SCP Data movement with submission id : " + dataMoveId; + logger.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java new file mode 100644 index 0000000..0319f27 --- /dev/null +++ b/gsoc2016/Design Changes in Airavata/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -0,0 +1,625 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.orchestrator.cpi.impl; + +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.application.io.DataType; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.error.LaunchValidationException; +import org.apache.airavata.model.error.ValidationResults; +import org.apache.airavata.model.error.ValidatorResult; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.experiment.*; +import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.*; +import org.apache.airavata.model.util.ExperimentModelUtil; +import org.apache.airavata.orchestrator.core.exception.OrchestratorException; +import org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter; +import org.apache.airavata.orchestrator.core.job.JobSubmitter; +import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils; +import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.utils.Constants; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +import java.util.concurrent.ExecutorService; + +public class SimpleOrchestratorImpl extends AbstractOrchestrator{ + private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class); + private ExecutorService executor; + + // this is going to be null unless the thread count is 0 + private JobSubmitter jobSubmitter = null; + + + public SimpleOrchestratorImpl() throws OrchestratorException { + try { + try { + // We are only going to use GFacPassiveJobSubmitter + jobSubmitter = new GFACPassiveJobSubmitter(); + jobSubmitter.initialize(this.orchestratorContext); + + } catch (Exception e) { + String error = "Error creating JobSubmitter in non threaded mode "; + logger.error(error); + throw new OrchestratorException(error, e); + } + } catch (OrchestratorException e) { + logger.error("Error Constructing the Orchestrator"); + throw e; + } + } + + public boolean launchProcess(ProcessModel processModel, String tokenId) throws OrchestratorException { + try { + return jobSubmitter.submit(processModel.getExperimentId(), processModel.getProcessId(), tokenId); + } catch (Exception e) { + throw new OrchestratorException("Error launching the job", e); + } + } + + public ValidationResults validateExperiment(ExperimentModel experiment) throws OrchestratorException,LaunchValidationException { + org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults(); + validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed. + String errorMsg = "Validation Errors : "; + if (this.orchestratorConfiguration.isEnableValidation()) { + List<String> validatorClasses = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses(); + for (String validator : validatorClasses) { + try { + Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class); + JobMetadataValidator jobMetadataValidator = vClass.newInstance(); + validationResults = jobMetadataValidator.validate(experiment, null); + if (validationResults.isValidationState()) { + logger.info("Validation of " + validator + " is SUCCESSFUL"); + } else { + List<ValidatorResult> validationResultList = validationResults.getValidationResultList(); + for (ValidatorResult result : validationResultList){ + if (!result.isResult()){ + String validationError = result.getErrorDetails(); + if (validationError != null){ + errorMsg += validationError + " "; + } + } + } + logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); + validationResults.setValidationState(false); + try { + ErrorModel details = new ErrorModel(); + details.setActualErrorMessage(errorMsg); + details.setCreationTime(Calendar.getInstance().getTimeInMillis()); + orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, details, + experiment.getExperimentId()); + } catch (RegistryException e) { + logger.error("Error while saving error details to registry", e); + } + break; + } + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + } /*catch (InstantiationException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + } catch (IllegalAccessException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + }*/ + } + } + if(validationResults.isValidationState()){ + return validationResults; + }else { + //atleast one validation has failed, so we throw an exception + LaunchValidationException launchValidationException = new LaunchValidationException(); + launchValidationException.setValidationResult(validationResults); + launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg); + throw launchValidationException; + } + } + + public ValidationResults validateProcess(ExperimentModel experiment, ProcessModel processModel) throws OrchestratorException,LaunchValidationException { + org.apache.airavata.model.error.ValidationResults validationResults = new org.apache.airavata.model.error.ValidationResults(); + validationResults.setValidationState(true); // initially making it to success, if atleast one failed them simply mark it failed. + String errorMsg = "Validation Errors : "; + if (this.orchestratorConfiguration.isEnableValidation()) { + List<String> validatorClzzez = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClasses(); + for (String validator : validatorClzzez) { + try { + Class<? extends JobMetadataValidator> vClass = Class.forName(validator.trim()).asSubclass(JobMetadataValidator.class); + JobMetadataValidator jobMetadataValidator = vClass.newInstance(); + validationResults = jobMetadataValidator.validate(experiment, processModel); + if (validationResults.isValidationState()) { + logger.info("Validation of " + validator + " is SUCCESSFUL"); + } else { + List<ValidatorResult> validationResultList = validationResults.getValidationResultList(); + for (ValidatorResult result : validationResultList){ + if (!result.isResult()){ + String validationError = result.getErrorDetails(); + if (validationError != null){ + errorMsg += validationError + " "; + } + } + } + logger.error("Validation of " + validator + " for experiment Id " + experiment.getExperimentId() + " is FAILED:[error]. " + errorMsg); + validationResults.setValidationState(false); + try { + ErrorModel details = new ErrorModel(); + details.setActualErrorMessage(errorMsg); + details.setCreationTime(Calendar.getInstance().getTimeInMillis()); + orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, details, + processModel.getProcessId()); + } catch (RegistryException e) { + logger.error("Error while saving error details to registry", e); + } + break; + } + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + } /*catch (InstantiationException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + } catch (IllegalAccessException e) { + logger.error("Error loading the validation class: ", validator, e); + validationResults.setValidationState(false); + }*/ + } + } + if(validationResults.isValidationState()){ + return validationResults; + }else { + //atleast one validation has failed, so we throw an exception + LaunchValidationException launchValidationException = new LaunchValidationException(); + launchValidationException.setValidationResult(validationResults); + launchValidationException.setErrorMessage("Validation failed refer the validationResults list for detail error. Validation errors : " + errorMsg); + throw launchValidationException; + } + } + + + public void cancelExperiment(ExperimentModel experiment, ProcessModel processModel, String tokenId) + throws OrchestratorException { + // FIXME +// List<JobDetails> jobDetailsList = task.getJobDetailsList(); +// for(JobDetails jobDetails:jobDetailsList) { +// JobState jobState = jobDetails.getJobStatus().getJobState(); +// if (jobState.getValue() > 4){ +// logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() + +// "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName()); +// return; +// } +// } +// jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID(),tokenId); + } + + + public ExecutorService getExecutor() { + return executor; + } + + public void setExecutor(ExecutorService executorIn) { + this.executor = executorIn; + } + + public JobSubmitter getJobSubmitter() { + return jobSubmitter; + } + + public void setJobSubmitter(JobSubmitter jobSubmitterIn) { + this.jobSubmitter = jobSubmitterIn; + } + + public void initialize() throws OrchestratorException { + + } + + public List<ProcessModel> createProcesses (String experimentId, String gatewayId) throws OrchestratorException { + List<ProcessModel> processModels = new ArrayList<ProcessModel>(); + try { + Registry registry = orchestratorContext.getRegistry(); + ExperimentModel experimentModel = (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId); + List<Object> processList = registry.getExperimentCatalog().get(ExperimentCatalogModelType.PROCESS, Constants.FieldConstants.ExperimentConstants.EXPERIMENT_ID, experimentId); + if (processList != null && !processList.isEmpty()) { + for (Object processObject : processList) { + ProcessModel processModel = (ProcessModel)processObject; + processModels.add(processModel); + } + }else { + ProcessModel processModel = ExperimentModelUtil.cloneProcessFromExperiment(experimentModel); + String processId = (String)registry.getExperimentCatalog().add(ExpCatChildDataType.PROCESS, processModel, experimentId); + processModel.setProcessId(processId); + processModels.add(processModel); + } + } catch (Exception e) { + throw new OrchestratorException("Error during creating process"); + } + return processModels; + } + + public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException { + try { + ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog(); + AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog(); + ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule(); + String userGivenQueueName = resourceSchedule.getQueueName(); + int userGivenWallTime = resourceSchedule.getWallTimeLimit(); + String resourceHostId = resourceSchedule.getResourceHostId(); + if (resourceHostId == null){ + throw new OrchestratorException("Compute Resource Id cannot be null at this point"); + } + ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId); + JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); + ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + List<String> taskIdList = new ArrayList<>(); + + if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) { + // TODO - breakdown unicore all in one task to multiple tasks, then we don't need to handle UNICORE here. + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime)); + } else { + taskIdList.addAll(createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog)); + taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId)); + if (autoSchedule) { + List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues(); + for (BatchQueue batchQueue : definedBatchQueues) { + if (batchQueue.getQueueName().equals(userGivenQueueName)) { + int maxRunTime = batchQueue.getMaxRunTime(); + if (maxRunTime < userGivenWallTime) { + resourceSchedule.setWallTimeLimit(maxRunTime); + // need to create more job submissions + int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime)); + for (int i = 1; i <= numOfMaxWallTimeJobs; i++) { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime)); + } + int leftWallTime = userGivenWallTime % maxRunTime; + if (leftWallTime != 0) { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime)); + } + } else { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); + } + } + } + } else { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); + } + taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId)); + } + // update process scheduling + experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId()); + return getTaskDag(taskIdList); + } catch (Exception e) { + throw new OrchestratorException("Error during creating process"); + } + } + + private String getTaskDag(List<String> taskIdList) { + if (taskIdList.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (String s : taskIdList) { + sb.append(s).append(","); // comma separated values + } + String dag = sb.toString(); + return dag.substring(0, dag.length() - 1); // remove last comma + } + + private List<String> createAndSaveEnvSetupTask(String gatewayId, + ProcessModel processModel, + ExperimentCatalog experimentCatalog) + throws RegistryException, TException { + List<String> envTaskIds = new ArrayList<>(); + TaskModel envSetupTask = new TaskModel(); + envSetupTask.setTaskType(TaskTypes.ENV_SETUP); + envSetupTask.setTaskStatus(new TaskStatus(TaskState.CREATED)); + envSetupTask.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + envSetupTask.setParentProcessId(processModel.getProcessId()); + EnvironmentSetupTaskModel envSetupSubModel = new EnvironmentSetupTaskModel(); + envSetupSubModel.setProtocol(OrchestratorUtils.getSecurityProtocol(orchestratorContext, processModel, gatewayId)); + ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + String scratchLocation = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId); + String workingDir = scratchLocation + File.separator + processModel.getProcessId(); + envSetupSubModel.setLocation(workingDir); + byte[] envSetupSub = ThriftUtils.serializeThriftObject(envSetupSubModel); + envSetupTask.setSubTaskModel(envSetupSub); + String envSetupTaskId = (String) experimentCatalog.add(ExpCatChildDataType.TASK, envSetupTask, processModel.getProcessId()); + envSetupTask.setTaskId(envSetupTaskId); + envTaskIds.add(envSetupTaskId); + return envTaskIds; + } + + public List<String> createAndSaveInputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException { + List<String> dataStagingTaskIds = new ArrayList<>(); + List<InputDataObjectType> processInputs = processModel.getProcessInputs(); + + sortByInputOrder(processInputs); + if (processInputs != null) { + for (InputDataObjectType processInput : processInputs) { + DataType type = processInput.getType(); + switch (type) { + case STDERR: + break; + case STDOUT: + break; + case URI: + case URI_COLLECTION: + try { + TaskModel inputDataStagingTask = getInputDataStagingTask(processModel, processInput, gatewayId); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, inputDataStagingTask, + processModel.getProcessId()); + inputDataStagingTask.setTaskId(taskId); + dataStagingTaskIds.add(inputDataStagingTask.getTaskId()); + } catch (TException | AppCatalogException | TaskException e) { + throw new RegistryException("Error while serializing data staging sub task model"); + } + break; + default: + // nothing to do + break; + } + } + } + return dataStagingTaskIds; + } + + public List<String> createAndSaveOutputDataStagingTasks(ProcessModel processModel, String gatewayId) throws RegistryException { + List<String> dataStagingTaskIds = new ArrayList<>(); + List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs(); + String appName = OrchestratorUtils.getApplicationInterfaceName(orchestratorContext, processModel); + if (processOutputs != null) { + for (OutputDataObjectType processOutput : processOutputs) { + DataType type = processOutput.getType(); + switch (type) { + case STDOUT : + processOutput.setValue(appName + ".stdout"); + createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput); + break; + case STDERR: + processOutput.setValue(appName + ".stderr"); + createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput); + break; + case URI: + createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput); + break; + default: + // nothing to do + break; + } + } + } + + try { + if (isArchive(processModel, gatewayId)) { + createArchiveDataStatgingTask(processModel, gatewayId, dataStagingTaskIds); + } + } catch (AppCatalogException e) { + throw new RegistryException("Error! Application interface retrieval failed"); + } + return dataStagingTaskIds; + } + + private boolean isArchive(ProcessModel processModel, String gatewayId) throws AppCatalogException { + AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + ApplicationInterfaceDescription appInterface = appCatalog.getApplicationInterface().getApplicationInterface(processModel.getApplicationInterfaceId()); + return appInterface.isArchiveWorkingDirectory(); + } + + private void createArchiveDataStatgingTask(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds) throws RegistryException { + TaskModel archiveTask = null; + try { + archiveTask = getOutputDataStagingTask(processModel, null, gatewayId); + } catch (TException e) { + throw new RegistryException("Error! DataStaging sub task serialization failed"); + } + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, archiveTask, + processModel.getProcessId()); + archiveTask.setTaskId(taskId); + dataStagingTaskIds.add(archiveTask.getTaskId()); + + } + + private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException { + try { + TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask, + processModel.getProcessId()); + outputDataStagingTask.setTaskId(taskId); + dataStagingTaskIds.add(outputDataStagingTask.getTaskId()); + } catch (TException e) { + throw new RegistryException("Error while serializing data staging sub task model", e); + } + } + + private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime) + throws TException, RegistryException, OrchestratorException { + + JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); + MonitorMode monitorMode = null; + if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) { + SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId()); + monitorMode = sshJobSubmission.getMonitorMode(); + } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) { + monitorMode = MonitorMode.FORK; + } else { + logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.", + processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name()); + throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name()); + } + List<String> submissionTaskIds = new ArrayList<>(); + TaskModel taskModel = new TaskModel(); + taskModel.setParentProcessId(processModel.getProcessId()); + taskModel.setCreationTime(new Date().getTime()); + taskModel.setLastUpdateTime(taskModel.getCreationTime()); + TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskModel.setTaskStatus(taskStatus); + taskModel.setTaskType(TaskTypes.JOB_SUBMISSION); + JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel(); + submissionSubTask.setMonitorMode(monitorMode); + submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol); + submissionSubTask.setWallTime(wallTime); + byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask); + taskModel.setSubTaskModel(bytes); + String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel, + processModel.getProcessId()); + taskModel.setTaskId(taskId); + submissionTaskIds.add(taskModel.getTaskId()); + + // create monitor task for this Email based monitor mode job + if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + TaskModel monitorTaskModel = new TaskModel(); + monitorTaskModel.setParentProcessId(processModel.getProcessId()); + monitorTaskModel.setCreationTime(new Date().getTime()); + monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime()); + TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED); + monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + monitorTaskModel.setTaskStatus(monitorTaskStatus); + monitorTaskModel.setTaskType(TaskTypes.MONITORING); + MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel(); + monitorSubTaskModel.setMonitorMode(monitorMode); + monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel)); + String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, monitorTaskModel, processModel.getProcessId()); + monitorTaskModel.setTaskId(mTaskId); + submissionTaskIds.add(monitorTaskModel.getTaskId()); + } + + return submissionTaskIds; + } + + private void sortByInputOrder(List<InputDataObjectType> processInputs) { + Collections.sort(processInputs, new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) { + return inputDT_1.getInputOrder() - inputDT_2.getInputOrder(); + } + }); + } + + private TaskModel getInputDataStagingTask(ProcessModel processModel, InputDataObjectType processInput, String gatewayId) throws RegistryException, TException, AppCatalogException, TaskException { + // create new task model for this task + TaskModel taskModel = new TaskModel(); + taskModel.setParentProcessId(processModel.getProcessId()); + taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + taskModel.setLastUpdateTime(taskModel.getCreationTime()); + TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskModel.setTaskStatus(taskStatus); + taskModel.setTaskType(TaskTypes.DATA_STAGING); + // create data staging sub task model + DataStagingTaskModel submodel = new DataStagingTaskModel(); + ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId()); + String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId(); + remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; + URI destination = null; + try { + DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId); + String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId); + destination = new URI(dataMovementProtocol.name(), + loginUserName, + computeResource.getHostName(), + OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), + remoteOutputDir , null, null); + } catch (URISyntaxException e) { + throw new TaskException("Error while constructing destination file URI"); + } + submodel.setType(DataStageType.INPUT); + submodel.setSource(processInput.getValue()); + submodel.setProcessInput(processInput); + submodel.setDestination(destination.toString()); + taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); + return taskModel; + } + + private TaskModel getOutputDataStagingTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException { + try { + + // create new task model for this task + TaskModel taskModel = new TaskModel(); + taskModel.setParentProcessId(processModel.getProcessId()); + taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + taskModel.setLastUpdateTime(taskModel.getCreationTime()); + TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskModel.setTaskStatus(taskStatus); + taskModel.setTaskType(TaskTypes.DATA_STAGING); + ComputeResourcePreference computeResourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + ComputeResourceDescription computeResource = orchestratorContext.getRegistry().getAppCatalog().getComputeResource().getComputeResource(processModel.getComputeResourceId()); + + String remoteOutputDir = OrchestratorUtils.getScratchLocation(orchestratorContext,processModel, gatewayId) + File.separator + processModel.getProcessId(); + remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; + DataStagingTaskModel submodel = new DataStagingTaskModel(); + DataMovementProtocol dataMovementProtocol = OrchestratorUtils.getPreferredDataMovementProtocol(orchestratorContext, processModel, gatewayId); + URI source = null; + try { + String loginUserName = OrchestratorUtils.getLoginUserName(orchestratorContext, processModel, gatewayId); + if (processOutput != null) { + submodel.setType(DataStageType.OUPUT); + submodel.setProcessOutput(processOutput); + source = new URI(dataMovementProtocol.name(), + loginUserName, + computeResource.getHostName(), + OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), + remoteOutputDir + processOutput.getValue(), null, null); + } else { + // archive + submodel.setType(DataStageType.ARCHIVE_OUTPUT); + source = new URI(dataMovementProtocol.name(), + loginUserName, + computeResource.getHostName(), + OrchestratorUtils.getDataMovementPort(orchestratorContext, processModel, gatewayId), + remoteOutputDir, null, null); + } + } catch (URISyntaxException e) { + throw new TaskException("Error while constructing source file URI"); + } + // We don't know destination location at this time, data staging task will set this. + // because destination is required field we set dummy destination + submodel.setSource(source.toString()); + // We don't know destination location at this time, data staging task will set this. + // because destination is required field we set dummy destination + submodel.setDestination("dummy://temp/file/location"); + taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); + return taskModel; + } catch (AppCatalogException | TaskException e) { + throw new RegistryException("Error occurred while retrieving data movement from app catalog", e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/1932e865/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml ---------------------------------------------------------------------- diff --git a/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml new file mode 100644 index 0000000..0761b04 --- /dev/null +++ b/gsoc2016/Design Changes in Airavata/orchestrator/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file + distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under + the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to + in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF + ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under + the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <parent> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata</artifactId> + <version>0.17-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>orchestrator</artifactId> + <packaging>pom</packaging> + <name>Airavata Orchestrator</name> + <url>http://airavata.apache.org/</url> + + <profiles> + <profile> + <id>default</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <modules> + <module>orchestrator-core</module> + <module>orchestrator-service</module> + <module>orchestrator-client</module> + </modules> + </profile> + </profiles> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + +</project>
