This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit ef94a5afd073359b0d75b72f51b254670953b7e2 Author: dimuthu <[email protected]> AuthorDate: Thu Feb 22 09:37:36 2018 -0500 Building groovy map --- modules/helix-spectator/pom.xml | 5 + .../airavata/helix/impl/task/AiravataTask.java | 156 +--- .../airavata/helix/impl/task/EnvSetupTask.java | 6 +- .../airavata/helix/impl/task/TaskContext.java | 802 +++++++++++++++++++++ .../impl/task/submission/GroovyMapBuilder.java | 335 +++++++++ .../helix/impl/task/submission/GroovyMapData.java | 51 +- .../submission/task/DefaultJobSubmissionTask.java | 37 +- .../submission/task/ForkJobSubmissionTask.java | 21 +- .../task/submission/task/JobSubmissionTask.java | 69 +- .../submission/task/LocalJobSubmissionTask.java | 10 +- .../helix/impl/workflow/SimpleWorkflow.java | 2 +- .../src/main/resources/application.properties | 2 +- 12 files changed, 1275 insertions(+), 221 deletions(-) diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml index bae2785..36fb586 100644 --- a/modules/helix-spectator/pom.xml +++ b/modules/helix-spectator/pom.xml @@ -45,6 +45,11 @@ <artifactId>mariadb-java-client</artifactId> <version>1.1.7</version> </dependency> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-templates</artifactId> + <version>2.4.7</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 72d3e17..315c07c 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -39,10 +39,8 @@ public abstract class AiravataTask extends AbstractTask { private ProcessModel processModel; private ComputeResourceDescription computeResourceDescription; - private ComputeResourcePreference gatewayComputeResourcePreference; - private UserComputeResourcePreference userComputeResourcePreference; - private UserResourceProfile userResourceProfile; - private GatewayResourceProfile gatewayResourceProfile; + + private TaskContext taskContext; @TaskParam(name = "Process Id") private String processId; @@ -87,22 +85,28 @@ public abstract class AiravataTask extends AbstractTask { this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel() .getComputeResourceId()); - this.gatewayComputeResourcePreference = getAppCatalog().getGatewayProfile() - .getComputeResourcePreference(getGatewayId(), computeResourceDescription.getComputeResourceId()); - - this.userComputeResourcePreference = getAppCatalog().getUserResourceProfile() - .getUserComputeResourcePreference(getProcessModel().getUserName(), getGatewayId(), getProcessModel() - .getComputeResourceId()); - - this.userResourceProfile = getAppCatalog().getUserResourceProfile() - .getUserResourceProfile(getProcessModel().getUserName(), getGatewayId()); - - this.gatewayResourceProfile = getAppCatalog().getGatewayProfile().getGatewayProfile(getGatewayId()); + TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId()); + taskContextBuilder.setAppCatalog(getAppCatalog()); + taskContextBuilder.setExperimentCatalog(getExperimentCatalog()); + taskContextBuilder.setProcessModel(getProcessModel()); + taskContextBuilder.setStatusPublisher(getStatusPublisher()); + + taskContextBuilder.setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId)); + taskContextBuilder.setGatewayComputeResourcePreference( + appCatalog.getGatewayProfile() + .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId())); + taskContextBuilder.setGatewayStorageResourcePreference( + appCatalog.getGatewayProfile() + .getStoragePreference(gatewayId, processModel.getStorageResourceId())); + + this.taskContext = taskContextBuilder.build(); } catch (AppCatalogException e) { e.printStackTrace(); } catch (RegistryException e) { e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); } } @@ -125,140 +129,27 @@ public abstract class AiravataTask extends AbstractTask { msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); } - - /////////////////// - - public String getComputeResourceId() { - if (isUseUserCRPref() && - userComputeResourcePreference != null && - isValid(userComputeResourcePreference.getComputeResourceId())) { - return userComputeResourcePreference.getComputeResourceId(); - } else { - return gatewayComputeResourcePreference.getComputeResourceId(); - } - } - - public String getComputeResourceCredentialToken(){ - if (isUseUserCRPref()) { - if (userComputeResourcePreference != null && - isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) { - return userComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - } else { - return userResourceProfile.getCredentialStoreToken(); - } - } else { - if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) { - return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken(); - } else { - return gatewayResourceProfile.getCredentialStoreToken(); - } - } - } - - public String getComputeResourceLoginUserName(){ - if (isUseUserCRPref() && - userComputeResourcePreference != null && - isValid(userComputeResourcePreference.getLoginUserName())) { - return userComputeResourcePreference.getLoginUserName(); - } else if (isValid(getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName())) { - return getProcessModel().getProcessResourceSchedule().getOverrideLoginUserName(); - } else { - return gatewayComputeResourcePreference.getLoginUserName(); - } - } - - public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException { - try { - JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol(); - ComputeResourceDescription resourceDescription = getComputeResourceDescription(); - List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces(); - Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>(); - List<JobSubmissionInterface> interfaces = new ArrayList<>(); - if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) { - for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){ - - if (preferredJobSubmissionProtocol != null){ - if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){ - if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){ - List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol()); - interfaceList.add(submissionInterface); - }else { - interfaces.add(submissionInterface); - orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces); - } - } - }else { - Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); - } - } - interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol); - Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); - } else { - throw new AppCatalogException("Compute resource should have at least one job submission interface defined..."); - } - return interfaces.get(0); - } catch (AppCatalogException e) { - throw new AppCatalogException("Error occurred while retrieving data from app catalog", e); - } - } - ////////////////////////// - - protected boolean isValid(String str) { - return str != null && !str.trim().isEmpty(); - } - - public boolean isUseUserCRPref() { - return getProcessModel().isUseUserCRPref(); - } - - public JobSubmissionProtocol getJobSubmissionProtocol() { - return getGatewayComputeResourcePreference().getPreferredJobSubmissionProtocol(); - } - - public ComputeResourcePreference getGatewayComputeResourcePreference() { - return gatewayComputeResourcePreference; - } - - public ComputeResourceDescription getComputeResourceDescription() { return computeResourceDescription; } //////////////////////// - - public void setAppCatalog(AppCatalog appCatalog) { - this.appCatalog = appCatalog; + + public TaskContext getTaskContext() { + return taskContext; } public ExperimentCatalog getExperimentCatalog() { return experimentCatalog; } - public void setExperimentCatalog(ExperimentCatalog experimentCatalog) { - this.experimentCatalog = experimentCatalog; - } - public Publisher getStatusPublisher() { return statusPublisher; } - public void setStatusPublisher(Publisher statusPublisher) { - this.statusPublisher = statusPublisher; - } - public String getProcessId() { return processId; } @@ -287,7 +178,4 @@ public abstract class AiravataTask extends AbstractTask { return processModel; } - public void setProcessModel(ProcessModel processModel) { - this.processModel = processModel; - } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index 1cab0e2..f079b9f 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -23,8 +23,10 @@ public class EnvSetupTask extends AiravataTask { public TaskResult onRun(TaskHelper taskHelper) { try { publishTaskState(TaskState.EXECUTING); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(), - getJobSubmissionProtocol().name(), getComputeResourceCredentialToken()); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken()); adaptor.createDirectory(workingDirectory); publishTaskState(TaskState.COMPLETED); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java new file mode 100644 index 0000000..7de738e --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -0,0 +1,802 @@ +package org.apache.airavata.helix.impl.task; + +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; +import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; +import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile; +import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference; +import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.task.TaskModel; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.curator.framework.CuratorFramework; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class TaskContext { + + private static final Logger log = LoggerFactory.getLogger(TaskContext.class); + // process model + private ExperimentCatalog experimentCatalog; + private AppCatalog appCatalog; + private Publisher statusPublisher; + private final String processId; + private final String gatewayId; + //private final String tokenId; + private ProcessModel processModel; + private String workingDir; + private String scratchLocation; + private String inputDir; + private String outputDir; + private String localWorkingDir; + private GatewayResourceProfile gatewayResourceProfile; + private ComputeResourcePreference gatewayComputeResourcePreference; + private StoragePreference gatewayStorageResourcePreference; + private UserResourceProfile userResourceProfile; + private UserComputeResourcePreference userComputeResourcePreference; + private UserStoragePreference userStoragePreference; + private ComputeResourceDescription computeResourceDescription; + private ApplicationDeploymentDescription applicationDeploymentDescription; + private ApplicationInterfaceDescription applicationInterfaceDescription; + private Map<String, String> sshProperties; + private String stdoutLocation; + private String stderrLocation; + private JobSubmissionProtocol jobSubmissionProtocol; + private DataMovementProtocol dataMovementProtocol; + private JobModel jobModel; + private StorageResourceDescription storageResource; + private MonitorMode monitorMode; + private ResourceJobManager resourceJobManager; + private boolean handOver; + private boolean cancel; + private List<String> taskExecutionOrder; + private List<TaskModel> taskList; + private Map<String, TaskModel> taskMap; + private boolean pauseTaskExecution = false; // Task can pause task execution by setting this value + private boolean complete = false; // all tasks executed? + private boolean recovery = false; // is process in recovery mode? + private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again + private boolean acknowledge; + private boolean recoveryWithCancel = false; + private String usageReportingGatewayId; + private List<String> queueSpecificMacros; + private String taskId; + private Object subTaskModel = null; + + + /** + * Note: process context property use lazy loading approach. In runtime you will see some properties as null + * unless you have access it previously. Once that property access using the api,it will be set to correct value. + */ + private TaskContext(String taskId, String processId, String gatewayId) { + this.processId = processId; + this.gatewayId = gatewayId; + this.taskId = taskId; + } + + public ExperimentCatalog getExperimentCatalog() { + return experimentCatalog; + } + + public void setExperimentCatalog(ExperimentCatalog experimentCatalog) { + this.experimentCatalog = experimentCatalog; + } + + public AppCatalog getAppCatalog() { + return appCatalog; + } + + public void setAppCatalog(AppCatalog appCatalog) { + this.appCatalog = appCatalog; + } + + public String getGatewayId() { + return gatewayId; + } + + public String getProcessId() { + return processId; + } + + public Publisher getStatusPublisher() { + return statusPublisher; + } + + public void setStatusPublisher(Publisher statusPublisher) { + this.statusPublisher = statusPublisher; + } + + public ProcessModel getProcessModel() { + return processModel; + } + + public void setProcessModel(ProcessModel processModel) { + this.processModel = processModel; + } + + public String getWorkingDir() { + if (workingDir == null) { + if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){ + workingDir = processModel.getProcessResourceSchedule().getStaticWorkingDir(); + }else { + String scratchLocation = getScratchLocation(); + workingDir = (scratchLocation.endsWith("/") ? scratchLocation + processId : scratchLocation + "/" + + processId); + } + } + return workingDir; + } + + public String getScratchLocation() { + if (scratchLocation == null) { + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getScratchLocation())) { + scratchLocation = userComputeResourcePreference.getScratchLocation(); + } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) { + scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation(); + }else { + scratchLocation = gatewayComputeResourcePreference.getScratchLocation(); + } + } + return scratchLocation; + } + + public void setWorkingDir(String workingDir) { + this.workingDir = workingDir; + } + + public GatewayResourceProfile getGatewayResourceProfile() { + return gatewayResourceProfile; + } + + public void setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) { + this.gatewayResourceProfile = gatewayResourceProfile; + } + + public UserResourceProfile getUserResourceProfile() { + return userResourceProfile; + } + + public void setUserResourceProfile(UserResourceProfile userResourceProfile) { + this.userResourceProfile = userResourceProfile; + } + + private UserComputeResourcePreference getUserComputeResourcePreference() { + return userComputeResourcePreference; + } + + public void setUserComputeResourcePreference(UserComputeResourcePreference userComputeResourcePreference) { + this.userComputeResourcePreference = userComputeResourcePreference; + } + + public UserStoragePreference getUserStoragePreference() { + return userStoragePreference; + } + + public void setUserStoragePreference(UserStoragePreference userStoragePreference) { + this.userStoragePreference = userStoragePreference; + } + + public StoragePreference getGatewayStorageResourcePreference() { + return gatewayStorageResourcePreference; + } + + public void setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) { + this.gatewayStorageResourcePreference = gatewayStorageResourcePreference; + } + + + public Map<String, String> getSshProperties() { + return sshProperties; + } + + public void setSshProperties(Map<String, String> sshProperties) { + this.sshProperties = sshProperties; + } + + public ComputeResourceDescription getComputeResourceDescription() { + return computeResourceDescription; + } + + public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) { + this.computeResourceDescription = computeResourceDescription; + } + + public ApplicationDeploymentDescription getApplicationDeploymentDescription() { + return applicationDeploymentDescription; + } + + public void setApplicationDeploymentDescription(ApplicationDeploymentDescription + applicationDeploymentDescription) { + this.applicationDeploymentDescription = applicationDeploymentDescription; + } + + public ApplicationInterfaceDescription getApplicationInterfaceDescription() { + return applicationInterfaceDescription; + } + + public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) { + this.applicationInterfaceDescription = applicationInterfaceDescription; + } + + public String getStdoutLocation() { + return stdoutLocation; + } + + public void setStdoutLocation(String stdoutLocation) { + this.stdoutLocation = stdoutLocation; + } + + public String getStderrLocation() { + return stderrLocation; + } + + public void setStderrLocation(String stderrLocation) { + this.stderrLocation = stderrLocation; + } + + public void setOutputDir(String outputDir) { + this.outputDir = outputDir; + } + + public String getOutputDir() { + if (outputDir == null) { + outputDir = getWorkingDir(); + } + return outputDir; + } + + public String getInputDir() { + if (inputDir == null) { + inputDir = getWorkingDir(); + } + return inputDir; + } + + public void setInputDir(String inputDir) { + this.inputDir = inputDir; + } + + public JobSubmissionProtocol getJobSubmissionProtocol() { + if (jobSubmissionProtocol == null) { + jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol(); + } + return jobSubmissionProtocol; + } + + public void setJobSubmissionProtocol(JobSubmissionProtocol jobSubmissionProtocol) { + this.jobSubmissionProtocol = jobSubmissionProtocol; + } + + public DataMovementProtocol getDataMovementProtocol() { + if (dataMovementProtocol == null) { + dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol(); + } + return dataMovementProtocol; + } + + public void setDataMovementProtocol(DataMovementProtocol dataMovementProtocol) { + this.dataMovementProtocol = dataMovementProtocol; + } + + public String getTaskDag() { + return getProcessModel().getTaskDag(); + } + + public List<TaskModel> getTaskList() { + if (taskList == null) { + synchronized (TaskModel.class){ + if (taskList == null) { + taskList = getProcessModel().getTasks(); + } + } + } + return taskList; + } + + + public List<String> getTaskExecutionOrder() { + return taskExecutionOrder; + } + + public void setTaskExecutionOrder(List<String> taskExecutionOrder) { + this.taskExecutionOrder = taskExecutionOrder; + } + + public Map<String, TaskModel> getTaskMap() { + if (taskMap == null) { + synchronized (TaskModel.class) { + if (taskMap == null) { + taskMap = new HashMap<>(); + for (TaskModel taskModel : getTaskList()) { + taskMap.put(taskModel.getTaskId(), taskModel); + } + } + } + } + return taskMap; + } + + public JobModel getJobModel() { + if (jobModel == null) { + jobModel = new JobModel(); + jobModel.setProcessId(processId); + jobModel.setWorkingDir(getWorkingDir()); + jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + } + return jobModel; + } + + public void setJobModel(JobModel jobModel) { + this.jobModel = jobModel; + } + + public ComputeResourcePreference getGatewayComputeResourcePreference() { + return gatewayComputeResourcePreference; + } + + public void setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) { + this.gatewayComputeResourcePreference = gatewayComputeResourcePreference; + } + + public ProcessState getProcessState() { + if(processModel.getProcessStatuses() != null && processModel.getProcessStatuses().size() > 0) + return processModel.getProcessStatuses().get(0).getState(); + else + return null; + } + + public void setProcessStatus(ProcessStatus status) { + if (status != null) { + log.info("expId: {}, processId: {} :- Process status changed {} -> {}", getExperimentId(), processId, + getProcessState().name(), status.getState().name()); + List<ProcessStatus> processStatuses = new ArrayList<>(); + processStatuses.add(status); + processModel.setProcessStatuses(processStatuses); + } + } + + public ProcessStatus getProcessStatus(){ + if(processModel.getProcessStatuses() != null) + return processModel.getProcessStatuses().get(0); + else + return null; + } + + public String getComputeResourceId() { + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getComputeResourceId())) { + return userComputeResourcePreference.getComputeResourceId(); + } else { + return gatewayComputeResourcePreference.getComputeResourceId(); + } + } + + public String getComputeResourceCredentialToken(){ + if (isUseUserCRPref()) { + if (userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getResourceSpecificCredentialStoreToken())) { + return userComputeResourcePreference.getResourceSpecificCredentialStoreToken(); + } else { + return userResourceProfile.getCredentialStoreToken(); + } + } else { + if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken())) { + return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken(); + } else { + return gatewayResourceProfile.getCredentialStoreToken(); + } + } + } + + public String getStorageResourceCredentialToken(){ + if (isValid(gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken())) { + return gatewayStorageResourcePreference.getResourceSpecificCredentialStoreToken(); + } else { + return gatewayResourceProfile.getCredentialStoreToken(); + } + } + + public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){ + return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol(); + } + + public DataMovementProtocol getPreferredDataMovementProtocol() { + return gatewayComputeResourcePreference.getPreferredDataMovementProtocol(); + } + + public void setMonitorMode(MonitorMode monitorMode) { + this.monitorMode = monitorMode; + } + + public MonitorMode getMonitorMode() { + return monitorMode; + } + + public void setResourceJobManager(ResourceJobManager resourceJobManager) { + this.resourceJobManager = resourceJobManager; + } + + public ResourceJobManager getResourceJobManager() { + return resourceJobManager; + } + + public String getLocalWorkingDir() { + return localWorkingDir; + } + + public void setLocalWorkingDir(String localWorkingDir) { + this.localWorkingDir = localWorkingDir; + } + + public String getExperimentId() { + return processModel.getExperimentId(); + } + + public boolean isHandOver() { + return handOver; + } + + public void setHandOver(boolean handOver) { + this.handOver = handOver; + } + + public boolean isCancel() { + return cancel; + } + + public void setCancel(boolean cancel) { + this.cancel = cancel; + } + + public boolean isInterrupted(){ + return this.cancel || this.handOver; + } + + public String getCurrentExecutingTaskId() { + if (currentExecutingTaskModel != null) { + return currentExecutingTaskModel.getTaskId(); + } + return null; + } + + public boolean isPauseTaskExecution() { + return pauseTaskExecution; + } + + public void setPauseTaskExecution(boolean pauseTaskExecution) { + this.pauseTaskExecution = pauseTaskExecution; + } + + public boolean isComplete() { + return complete; + } + + public void setComplete(boolean complete) { + this.complete = complete; + } + + public boolean isRecovery() { + return recovery; + } + + public void setRecovery(boolean recovery) { + this.recovery = recovery; + } + + public TaskModel getCurrentExecutingTaskModel() { + return currentExecutingTaskModel; + } + + public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) { + this.currentExecutingTaskModel = currentExecutingTaskModel; + } + + public StorageResourceDescription getStorageResource() { + return storageResource; + } + + public void setStorageResource(StorageResourceDescription storageResource) { + this.storageResource = storageResource; + } + + public void setAcknowledge(boolean acknowledge) { + this.acknowledge = acknowledge; + } + + public boolean isAcknowledge() { + return acknowledge; + } + + public boolean isRecoveryWithCancel() { + return recoveryWithCancel; + } + + public void setRecoveryWithCancel(boolean recoveryWithCancel) { + this.recoveryWithCancel = recoveryWithCancel; + } + + public boolean isUseUserCRPref() { + return getProcessModel().isUseUserCRPref(); + } + + public String getComputeResourceLoginUserName(){ + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getLoginUserName())) { + return userComputeResourcePreference.getLoginUserName(); + } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName())) { + return processModel.getProcessResourceSchedule().getOverrideLoginUserName(); + } else { + return gatewayComputeResourcePreference.getLoginUserName(); + } + } + + public String getStorageResourceLoginUserName(){ + return gatewayStorageResourcePreference.getLoginUserName(); + } + + public String getStorageFileSystemRootLocation(){ + return gatewayStorageResourcePreference.getFileSystemRootLocation(); + } + + public String getStorageResourceId() { + return gatewayStorageResourcePreference.getStorageResourceId(); + } + + private ComputationalResourceSchedulingModel getProcessCRSchedule() { + if (getProcessModel() != null) { + return getProcessModel().getProcessResourceSchedule(); + } else { + return null; + } + } + + private boolean isValid(String str) { + return str != null && !str.trim().isEmpty(); + } + + public String getUsageReportingGatewayId() { + return gatewayComputeResourcePreference.getUsageReportingGatewayId(); + } + + public String getAllocationProjectNumber() { + return gatewayComputeResourcePreference.getAllocationProjectNumber(); + } + + public String getReservation() { + long start = 0, end = 0; + String reservation = null; + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getReservation())) { + reservation = userComputeResourcePreference.getReservation(); + start = userComputeResourcePreference.getReservationStartTime(); + end = userComputeResourcePreference.getReservationEndTime(); + } else { + reservation = gatewayComputeResourcePreference.getReservation(); + start = gatewayComputeResourcePreference.getReservationStartTime(); + end = gatewayComputeResourcePreference.getReservationEndTime(); + } + if (reservation != null && start > 0 && start < end) { + long now = Calendar.getInstance().getTimeInMillis(); + if (now > start && now < end) { + return reservation; + } + } + return null; + } + + public String getQualityOfService() { + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getQualityOfService())) { + return userComputeResourcePreference.getQualityOfService(); + } else { + return gatewayComputeResourcePreference.getQualityOfService(); + } + } + + + public String getQueueName() { + if (isUseUserCRPref() && + userComputeResourcePreference != null && + isValid(userComputeResourcePreference.getPreferredBatchQueue())) { + return userComputeResourcePreference.getPreferredBatchQueue(); + } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) { + return processModel.getProcessResourceSchedule().getQueueName(); + } else { + return gatewayComputeResourcePreference.getPreferredBatchQueue(); + } + } + + public List<String> getQueueSpecificMacros() { + String queueName = getProcessCRSchedule().getQueueName(); + Optional<BatchQueue> queue = getComputeResourceDescription().getBatchQueues().stream() + .filter(x->x.getQueueName().equals(queueName)).findFirst(); + if(queue.isPresent()){ + if(queue.get().getQueueSpecificMacros() != null && !queue.get().getQueueSpecificMacros().equals("")){ + return Arrays.asList(queue.get().getQueueSpecificMacros().split(",")); + } + } + return null; + } + + public JobSubmissionInterface getPreferredJobSubmissionInterface() throws AppCatalogException { + try { + JobSubmissionProtocol preferredJobSubmissionProtocol = getJobSubmissionProtocol(); + ComputeResourceDescription resourceDescription = getComputeResourceDescription(); + List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces(); + Map<JobSubmissionProtocol, List<JobSubmissionInterface>> orderedInterfaces = new HashMap<>(); + List<JobSubmissionInterface> interfaces = new ArrayList<>(); + if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) { + for (JobSubmissionInterface submissionInterface : jobSubmissionInterfaces){ + + if (preferredJobSubmissionProtocol != null){ + if (preferredJobSubmissionProtocol.toString().equals(submissionInterface.getJobSubmissionProtocol().toString())){ + if (orderedInterfaces.containsKey(submissionInterface.getJobSubmissionProtocol())){ + List<JobSubmissionInterface> interfaceList = orderedInterfaces.get(submissionInterface.getJobSubmissionProtocol()); + interfaceList.add(submissionInterface); + }else { + interfaces.add(submissionInterface); + orderedInterfaces.put(submissionInterface.getJobSubmissionProtocol(), interfaces); + } + } + }else { + Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); + } + }); + } + } + interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol); + Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); + } + }); + } else { + throw new AppCatalogException("Compute resource should have at least one job submission interface defined..."); + } + return interfaces.get(0); + } catch (AppCatalogException e) { + throw new AppCatalogException("Error occurred while retrieving data from app catalog", e); + } + } + + public TaskModel getCurrentTaskModel() { + return getTaskMap().get(taskId); + } + + public Object getSubTaskModel() throws TException { + if (subTaskModel == null) { + subTaskModel = ThriftUtils.getSubTaskModel(getCurrentTaskModel()); + } + return subTaskModel; + } + + public static class TaskContextBuilder { + private final String processId; + private final String gatewayId; + private final String taskId; + private ExperimentCatalog experimentCatalog; + private AppCatalog appCatalog; + private Publisher statusPublisher; + private GatewayResourceProfile gatewayResourceProfile; + private ComputeResourcePreference gatewayComputeResourcePreference; + private StoragePreference gatewayStorageResourcePreference; + private ProcessModel processModel; + + public TaskContextBuilder(String processId, String gatewayId, String taskId) throws Exception { + if (notValid(processId) || notValid(gatewayId) || notValid(taskId)) { + throwError("Process Id, Gateway Id and Task Id must be not null"); + } + this.processId = processId; + this.gatewayId = gatewayId; + this.taskId = taskId; + } + + public TaskContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile) { + this.gatewayResourceProfile = gatewayResourceProfile; + return this; + } + + public TaskContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference gatewayComputeResourcePreference) { + this.gatewayComputeResourcePreference = gatewayComputeResourcePreference; + return this; + } + + public TaskContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference) { + this.gatewayStorageResourcePreference = gatewayStorageResourcePreference; + return this; + } + + public TaskContextBuilder setProcessModel(ProcessModel processModel) { + this.processModel = processModel; + return this; + } + + public TaskContextBuilder setExperimentCatalog(ExperimentCatalog experimentCatalog) { + this.experimentCatalog = experimentCatalog; + return this; + } + + public TaskContextBuilder setAppCatalog(AppCatalog appCatalog) { + this.appCatalog = appCatalog; + return this; + } + + public TaskContextBuilder setStatusPublisher(Publisher statusPublisher) { + this.statusPublisher = statusPublisher; + return this; + } + + public TaskContext build() throws Exception { + if (notValid(gatewayResourceProfile)) { + throwError("Invalid GatewayResourceProfile"); + } + if (notValid(gatewayComputeResourcePreference)) { + throwError("Invalid Gateway ComputeResourcePreference"); + } + if (notValid(gatewayStorageResourcePreference)) { + throwError("Invalid Gateway StoragePreference"); + } + if (notValid(processModel)) { + throwError("Invalid Process Model"); + } + if (notValid(appCatalog)) { + throwError("Invalid AppCatalog"); + } + if (notValid(experimentCatalog)) { + throwError("Invalid Experiment catalog"); + } + //if (notValid(statusPublisher)) { + // throwError("Invalid Status Publisher"); + //} + + TaskContext ctx = new TaskContext(processId, gatewayId, taskId); + ctx.setAppCatalog(appCatalog); + ctx.setExperimentCatalog(experimentCatalog); + ctx.setStatusPublisher(statusPublisher); + ctx.setProcessModel(processModel); + ctx.setGatewayResourceProfile(gatewayResourceProfile); + ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference); + ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference); + + return ctx; + } + + private boolean notValid(Object value) { + return value == null; + } + + private void throwError(String msg) throws Exception { + throw new Exception(msg); + } + + } +} + + diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java new file mode 100644 index 0000000..0b92922 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java @@ -0,0 +1,335 @@ +package org.apache.airavata.helix.impl.task.submission; + +import groovy.text.GStringTemplateEngine; +import groovy.text.TemplateEngine; +import org.apache.airavata.helix.impl.task.TaskContext; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.CommandObject; +import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.application.io.DataType; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.parallelism.ApplicationParallelismType; +import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; +import org.apache.airavata.model.task.JobSubmissionTaskModel; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GroovyMapBuilder { + + private static final Logger logger = LogManager.getLogger(GroovyMapBuilder.class); + + public static final String MULTIPLE_INPUTS_SPLITTER = ","; + + private TaskContext taskContext; + + public GroovyMapBuilder(TaskContext taskContext) { + this.taskContext = taskContext; + } + + public GroovyMapData build() throws Exception { + GroovyMapData mapData = new GroovyMapData(); + mapData.setInputDir(taskContext.getInputDir()); + mapData.setOutputDir(taskContext.getOutputDir()); + mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath()); + mapData.setStdoutFile(taskContext.getStdoutLocation()); + mapData.setStderrFile(taskContext.getStderrLocation()); + mapData.setScratchLocation(taskContext.getScratchLocation()); + mapData.setGatewayId(taskContext.getGatewayId()); + mapData.setGatewayUserName(taskContext.getProcessModel().getUserName()); + mapData.setApplicationName(taskContext.getApplicationInterfaceDescription().getApplicationName()); + mapData.setQueueSpecificMacros(taskContext.getQueueSpecificMacros()); + mapData.setAccountString(taskContext.getAllocationProjectNumber()); + mapData.setReservation(taskContext.getReservation()); + mapData.setJobName("A" + String.valueOf(generateJobName())); + + List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true); + inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true)); + mapData.setInputs(inputValues); + + List<String> inputValuesAll = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false); + inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false)); + mapData.setInputsAll(inputValuesAll); + + //mapData.setUserName(taskContext.geJo) + + mapData.setShellName("/bin/bash"); + + if (taskContext != null) { + try { + JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel()); + if (jobSubmissionTaskModel.getWallTime() > 0) { + mapData.setMaxWallTime(maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime())); + // TODO fix this + /*if (resourceJobManager != null) { + if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) { + groovyMap.add(Script.MAX_WALL_TIME, + GFacUtils.maxWallTimeCalculatorForLSF(jobSubmissionTaskModel.getWallTime())); + } + }*/ + } + } catch (TException e) { + logger.error("Error while getting job submission sub task model", e); + } + } + + // NOTE: Give precedence to data comes with experiment + // qos per queue + String qoS = getQoS(taskContext.getQualityOfService(), taskContext.getQueueName()); + if (qoS != null) { + mapData.setQualityOfService(qoS); + } + ComputationalResourceSchedulingModel scheduling = taskContext.getProcessModel().getProcessResourceSchedule(); + if (scheduling != null) { + int totalNodeCount = scheduling.getNodeCount(); + int totalCPUCount = scheduling.getTotalCPUCount(); + + if (isValid(scheduling.getQueueName())) { + mapData.setQueueName(scheduling.getQueueName()); + } + if (totalNodeCount > 0) { + mapData.setNodes(totalCPUCount); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + mapData.setProcessPerNode(ppn); + mapData.setCpuCount(totalCPUCount); + } + // max wall time may be set before this level if jobsubmission task has wall time configured to this job, + // if so we ignore scheduling configuration. + if (scheduling.getWallTimeLimit() > 0 && mapData.getMaxWallTime() == null) { + mapData.setMaxWallTime(maxWallTimeCalculator(scheduling.getWallTimeLimit())); + + // TODO fix this + /* + if (resourceJobManager != null) { + if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) { + mapData.setMaxWallTime(maxWallTimeCalculatorForLSF(scheduling.getWallTimeLimit())); + } + } + */ + } + if (scheduling.getTotalPhysicalMemory() > 0) { + mapData.setUsedMem(scheduling.getTotalPhysicalMemory()); + } + if (isValid(scheduling.getOverrideLoginUserName())) { + mapData.setUserName(scheduling.getOverrideLoginUserName()); + } + if (isValid(scheduling.getOverrideAllocationProjectNumber())) { + mapData.setAccountString(scheduling.getOverrideAllocationProjectNumber()); + } + if (isValid(scheduling.getStaticWorkingDir())) { + mapData.setWorkingDirectory(scheduling.getStaticWorkingDir()); + } + } else { + logger.error("Task scheduling cannot be null at this point.."); + } + + ApplicationDeploymentDescription appDepDescription = taskContext.getApplicationDeploymentDescription(); + + List<SetEnvPaths> exportCommands = appDepDescription.getSetEnvironment(); + if (exportCommands != null) { + List<String> exportCommandList = exportCommands.stream() + .sorted((e1, e2) -> e1.getEnvPathOrder() - e2.getEnvPathOrder()) + .map(map -> map.getName() + "=" + map.getValue()) + .collect(Collectors.toList()); + mapData.setExports(exportCommandList); + } + + List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds(); + if (moduleCmds != null) { + List<String> modulesCmdCollect = moduleCmds.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> map.getCommand()) + .collect(Collectors.toList()); + mapData.setModuleCommands(modulesCmdCollect); + } + + List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands(); + if (preJobCommands != null) { + List<String> preJobCmdCollect = preJobCommands.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> parseCommands(map.getCommand(), mapData)) + .collect(Collectors.toList()); + mapData.setPreJobCommands(preJobCmdCollect); + } + + List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands(); + if (postJobCommands != null) { + List<String> postJobCmdCollect = postJobCommands.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> parseCommands(map.getCommand(), mapData)) + .collect(Collectors.toList()); + mapData.setPostJobCommands(postJobCmdCollect); + } + + ApplicationParallelismType parallelism = appDepDescription.getParallelism(); + if (parallelism != null) { + if (parallelism != ApplicationParallelismType.SERIAL) { + Map<ApplicationParallelismType, String> parallelismPrefix = taskContext.getResourceJobManager().getParallelismPrefix(); + if (parallelismPrefix != null){ + String parallelismCommand = parallelismPrefix.get(parallelism); + if (parallelismCommand != null){ + mapData.setJobSubmitterCommand(parallelismCommand); + }else { + throw new Exception("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog"); + } + } + } + } + + return mapData; + } + + public static int generateJobName() { + Random random = new Random(); + int i = random.nextInt(Integer.MAX_VALUE); + i = i + 99999999; + if (i < 0) { + i = i * (-1); + } + return i; + } + + private static List<String> getProcessInputValues(List<InputDataObjectType> processInputs, boolean commandLineOnly) { + List<String> inputValues = new ArrayList<String>(); + if (processInputs != null) { + + // sort the inputs first and then build the command ListR + Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { + return inputDataObjectType.getInputOrder() - t1.getInputOrder(); + } + }; + Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); + for (InputDataObjectType input : processInputs) { + sortedInputSet.add(input); + } + for (InputDataObjectType inputDataObjectType : sortedInputSet) { + if (commandLineOnly && !inputDataObjectType.isRequiredToAddedToCommandLine()) { + continue; + } + if (inputDataObjectType.getApplicationArgument() != null + && !inputDataObjectType.getApplicationArgument().equals("")) { + inputValues.add(inputDataObjectType.getApplicationArgument()); + } + + if (inputDataObjectType.getValue() != null + && !inputDataObjectType.getValue().equals("")) { + if (inputDataObjectType.getType() == DataType.URI) { + // set only the relative path + String filePath = inputDataObjectType.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } else if (inputDataObjectType.getType() == DataType.URI_COLLECTION) { + String filePaths = inputDataObjectType.getValue(); + String[] paths = filePaths.split(MULTIPLE_INPUTS_SPLITTER); + String filePath; + String inputs = ""; + int i = 0; + for (; i < paths.length - 1; i++) { + filePath = paths[i]; + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + // File names separate by a space + inputs += filePath + " "; + } + inputs += paths[i]; + inputValues.add(inputs); + } else { + inputValues.add(inputDataObjectType.getValue()); + } + + } + } + } + return inputValues; + } + + private static List<String> getProcessOutputValues(List<OutputDataObjectType> processOutputs, boolean commandLineOnly) { + List<String> inputValues = new ArrayList<>(); + if (processOutputs != null) { + for (OutputDataObjectType output : processOutputs) { + if (output.getApplicationArgument() != null + && !output.getApplicationArgument().equals("")) { + inputValues.add(output.getApplicationArgument()); + } + if(commandLineOnly){ + if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) { + if (output.getType() == DataType.URI) { + String filePath = output.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } + } + }else{ + if (output.getValue() != null && !output.getValue().equals("")) { + if (output.getType() == DataType.URI) { + String filePath = output.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } + } + } + + } + } + return inputValues; + } + + static String getQoS(String qualityOfService, String preferredBatchQueue) { + if(preferredBatchQueue == null || preferredBatchQueue.isEmpty() + || qualityOfService == null || qualityOfService.isEmpty()) return null; + final String qos = "qos"; + Pattern pattern = Pattern.compile(preferredBatchQueue + "=(?<" + qos + ">[^,]*)"); + Matcher matcher = pattern.matcher(qualityOfService); + if (matcher.find()) { + return matcher.group(qos); + } + return null; + } + + public static String maxWallTimeCalculator(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime + ":00"; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes + ":00"; + } + } + + public static String maxWallTimeCalculatorForLSF(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes; + } + } + + private static boolean isValid(String str) { + return str != null && !str.isEmpty(); + } + + static String parseCommands(String value, GroovyMapData bindMap) { + TemplateEngine templateEngine = new GStringTemplateEngine(); + try { + return templateEngine.createTemplate(value).make(bindMap.toImmutableMap()).toString(); + } catch (ClassNotFoundException | IOException e) { + throw new IllegalArgumentException("Error while parsing command " + value + + " , Invalid command or incomplete bind map"); + } + } + +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java index ec75fb7..995f772 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java @@ -1,6 +1,14 @@ package org.apache.airavata.helix.impl.task.submission; +import com.google.common.collect.ImmutableMap; +import groovy.lang.Writable; +import groovy.text.GStringTemplateEngine; +import groovy.text.TemplateEngine; +import org.apache.airavata.common.utils.ApplicationSettings; + +import java.io.File; import java.lang.reflect.Field; +import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +43,7 @@ public class GroovyMapData { private String applicationName; @ScriptTag(name = "queueSpecificMacros") - private String queueSpecificMacros; + private List<String> queueSpecificMacros; @ScriptTag(name = "accountString") private String accountString; @@ -206,13 +214,12 @@ public class GroovyMapData { return this; } - public String getQueueSpecificMacros() { + public List<String> getQueueSpecificMacros() { return queueSpecificMacros; } - public GroovyMapData setQueueSpecificMacros(String queueSpecificMacros) { + public void setQueueSpecificMacros(List<String> queueSpecificMacros) { this.queueSpecificMacros = queueSpecificMacros; - return this; } public String getAccountString() { @@ -412,4 +419,40 @@ public class GroovyMapData { this.chassisName = chassisName; return this; } + + public Map toImmutableMap() { + + Map<String, Object> dataMap = new HashMap<>(); + Field[] declaredFields = this.getClass().getDeclaredFields(); + for (Field field : declaredFields) { + field.setAccessible(true); + if (field.getAnnotation(ScriptTag.class) != null) { + try { + dataMap.put(field.getAnnotation(ScriptTag.class).name(), field.get(this)); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + } + + return dataMap; + } + + public String getAsString(String templateName) throws Exception { + URL templateUrl = ApplicationSettings.loadFile(templateName); + if (templateUrl == null) { + String error = "Template file '" + templateName + "' not found"; + throw new Exception(error); + } + File template = new File(templateUrl.getPath()); + TemplateEngine engine = new GStringTemplateEngine(); + Writable make; + try { + + make = engine.createTemplate(template).make(toImmutableMap()); + } catch (Exception e) { + throw new Exception("Error while generating script using groovy map"); + } + return make.toString(); + } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index fb9917f..fab4747 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo; @@ -36,25 +37,24 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { @Override public TaskResult onRun(TaskHelper taskHelper) { try { - GroovyMapData groovyMapData = new GroovyMapData(); + GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); JobModel jobModel = new JobModel(); jobModel.setProcessId(getProcessId()); - jobModel.setWorkingDir(groovyMapData.getWorkingDirectory()); + jobModel.setWorkingDir(mapData.getWorkingDirectory()); jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setTaskId(getTaskId()); - jobModel.setJobName(groovyMapData.getJobName()); + jobModel.setJobName(mapData.getJobName()); - File jobFile = SubmissionUtil.createJobFile(groovyMapData); + if (mapData != null) { + //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken()); - - if (jobFile != null && jobFile.exists()) { - jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(), - getJobSubmissionProtocol().name(), getComputeResourceCredentialToken()); - - JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory()); + JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); jobModel.setExitCode(submissionOutput.getExitCode()); jobModel.setStdErr(submissionOutput.getStdErr()); @@ -137,7 +137,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand(); String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable(); ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId()); - String username = experiment.getUserName() + "@" + getGatewayComputeResourcePreference().getUsageReportingGatewayId(); + String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId(); RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username + " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId ); adaptor.executeCommand(rawCommandInfo.getRawCommand(), null); @@ -150,7 +150,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { } else { int verificationTryCount = 0; while (verificationTryCount++ < 3) { - String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getComputeResourceLoginUserName()); + String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName()); if (verifyJobId != null && !verifyJobId.isEmpty()) { // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED jobId = verifyJobId; @@ -194,17 +194,12 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { } } else { + return onFail("Job data is null", true, null); + // taskStatus.setReason("JobFile is null"); //taskStatus.setState(TaskState.FAILED); - if (jobFile == null) { - return onFail("Job file is null", true, null); - // taskStatus.setReason("JobFile is null"); - } else { - //taskStatus.setReason("Job file doesn't exist"); - return onFail("Job file doesn't exist", true, null); - } } } catch (Exception e) { - return onFail("Task failed due to unexpected issue", false, null); + return onFail("Task failed due to unexpected issue", false, e); } // TODO get rid of this return onFail("Task moved to an unknown state", false, null); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java index da04365..58b70ef 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; import org.apache.airavata.helix.task.api.TaskHelper; @@ -23,23 +24,23 @@ public class ForkJobSubmissionTask extends JobSubmissionTask { public TaskResult onRun(TaskHelper taskHelper) { try { - GroovyMapData groovyMapData = new GroovyMapData(); + GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); JobModel jobModel = new JobModel(); jobModel.setProcessId(getProcessId()); - jobModel.setWorkingDir(groovyMapData.getWorkingDirectory()); + jobModel.setWorkingDir(mapData.getWorkingDirectory()); jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setTaskId(getTaskId()); - jobModel.setJobName(groovyMapData.getJobName()); + jobModel.setJobName(mapData.getJobName()); - File jobFile = SubmissionUtil.createJobFile(groovyMapData); + if (mapData != null) { + //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken()); - if (jobFile != null && jobFile.exists()) { - jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(), - getJobSubmissionProtocol().name(), getComputeResourceCredentialToken()); - - JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory()); + JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); jobModel.setExitCode(submissionOutput.getExitCode()); jobModel.setStdErr(submissionOutput.getStdErr()); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index fe5a3dc..11e59eb 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -4,11 +4,14 @@ import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.CommandOutput; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.impl.task.AiravataTask; +import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.config.JobFactory; import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration; import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo; import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; @@ -23,9 +26,11 @@ import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.registry.cpi.*; +import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import java.io.File; +import java.security.SecureRandom; import java.util.*; public abstract class JobSubmissionTask extends AiravataTask { @@ -38,10 +43,19 @@ public abstract class JobSubmissionTask extends AiravataTask { } ////////////////////// - protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, File jobFile, String workingDirectory) throws Exception { + protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( - getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface())); - RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobFile.getPath()); + getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); + + String scriptAsString = groovyMapData.getAsString(jobManagerConfiguration.getJobDescriptionTemplateName()); + + int number = new SecureRandom().nextInt(); + number = (number < 0 ? -number : number); + File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension()); + FileUtils.writeStringToFile(tempJobFile, scriptAsString); + + // TODO transfer file + RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath()); CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory); JobSubmissionOutput jsoutput = new JobSubmissionOutput(); @@ -63,12 +77,17 @@ public abstract class JobSubmissionTask extends AiravataTask { jsoutput.setStdOut(commandOutput.getStdOut()); jsoutput.setStdErr(commandOutput.getStdError()); return jsoutput; + } + public File getLocalDataDir() { + String outputPath = ServerSettings.getLocalDataLocation(); + outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator); + return new File(outputPath + getProcessId()); } public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( - getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface())); + getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobID).getRawCommand(), null); return jobManagerConfiguration.getParser().parseJobStatus(jobID, commandOutput.getStdOut()); @@ -77,7 +96,7 @@ public abstract class JobSubmissionTask extends AiravataTask { public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( - getAppCatalog(), getJobSubmissionProtocol(), getPreferredJobSubmissionInterface())); + getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName); CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null); @@ -159,44 +178,4 @@ public abstract class JobSubmissionTask extends AiravataTask { ///////////// required for groovy map - private String workingDir; - private String scratchLocation; - private UserComputeResourcePreference userComputeResourcePreference; - - public String getWorkingDir() { - if (workingDir == null) { - if (getProcessModel().getProcessResourceSchedule().getStaticWorkingDir() != null){ - workingDir = getProcessModel().getProcessResourceSchedule().getStaticWorkingDir(); - }else { - String scratchLocation = getScratchLocation(); - workingDir = (scratchLocation.endsWith("/") ? scratchLocation + getProcessId() : scratchLocation + "/" + - getProcessId()); - } - } - return workingDir; - } - - public String getScratchLocation() { - if (scratchLocation == null) { - if (isUseUserCRPref() && - userComputeResourcePreference != null && - isValid(userComputeResourcePreference.getScratchLocation())) { - scratchLocation = userComputeResourcePreference.getScratchLocation(); - } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation())) { - scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation(); - }else { - scratchLocation = gatewayComputeResourcePreference.getScratchLocation(); - } - } - return scratchLocation; - } - - protected UserComputeResourcePreference userComputeResourcePreference() throws AppCatalogException { - UserComputeResourcePreference userComputeResourcePreference = - getAppCatalog().getUserResourceProfile().getUserComputeResourcePreference( - getProcessModel().getUserName(), - getGatewayId(), - getProcessModel().getComputeResourceId()); - } - } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java index 5a3ca31..67ad0db 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task.submission.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask; @@ -41,10 +42,13 @@ public class LocalJobSubmissionTask extends JobSubmissionTask { jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); saveJobModel(jobModel); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(getComputeResourceId(), - getJobSubmissionProtocol().name(), getComputeResourceCredentialToken()); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken()); - JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, jobFile, groovyMapData.getWorkingDirectory()); + GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); + JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, groovyMapData.getWorkingDirectory()); JobStatus jobStatus = new JobStatus(); jobStatus.setJobState(JobState.SUBMITTED); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java index 51feff4..397ff45 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java @@ -20,7 +20,7 @@ public class SimpleWorkflow { defaultJobSubmissionTask.setGatewayId("default"); defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a"); defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6"); - defaultJobSubmissionTask.setTaskId(UUID.randomUUID().toString()); + defaultJobSubmissionTask.setTaskId("TASK_612844a4-aedb-41a5-824f-9b20c76867f7"); List<AbstractTask> tasks = new ArrayList<>(); tasks.add(defaultJobSubmissionTask); diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties index 41c5e5f..a9b0969 100644 --- a/modules/helix-spectator/src/main/resources/application.properties +++ b/modules/helix-spectator/src/main/resources/application.properties @@ -1,3 +1,3 @@ zookeeper.connection.url=localhost:2199 helix.cluster.name=AiravataDemoCluster -participant.name=all-p1 \ No newline at end of file +participant.name=all-p2 \ No newline at end of file -- To stop receiving notification emails like this one, please contact [email protected].
