Repository: airavata Updated Branches: refs/heads/develop aefd913a8 -> 4d9c56e9a
Renamed AuroraJobSubmission -> AuroraJobSubmissionTask Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4d9c56e9 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4d9c56e9 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4d9c56e9 Branch: refs/heads/develop Commit: 4d9c56e9a9f1431a493ca11e82eae21c82d1ebdf Parents: aefd913 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Nov 7 15:35:45 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Nov 7 15:35:45 2016 -0500 ---------------------------------------------------------------------- .../server/src/main/resources/gfac-config.yaml | 2 + .../gfac/impl/task/AuroraJobSubmission.java | 147 ------------------- .../gfac/impl/task/AuroraJobSubmissionTask.java | 147 +++++++++++++++++++ 3 files changed, 149 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4d9c56e9/modules/configuration/server/src/main/resources/gfac-config.yaml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml index 802d1e5..7d580ab 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.yaml +++ b/modules/configuration/server/src/main/resources/gfac-config.yaml @@ -34,6 +34,8 @@ jobSubmitters: - submissionProtocol: LOCAL taskClass: org.apache.airavata.gfac.impl.task.LocalJobSubmissionTask + - submissionProtocol: CLOUD + taskClass: org.apache.airavata.gfac.impl.task.AuroraJobSubmissionTask # Following job subbmitters are not yet implemented. http://git-wip-us.apache.org/repos/asf/airavata/blob/4d9c56e9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java deleted file mode 100644 index 6a3d898..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.gfac.impl.task; - -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; -import org.apache.airavata.cloud.aurora.client.bean.IdentityBean; -import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean; -import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; -import org.apache.airavata.cloud.aurora.client.bean.ProcessBean; -import org.apache.airavata.cloud.aurora.client.bean.ResourceBean; -import org.apache.airavata.cloud.aurora.client.bean.ResponseBean; -import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean; -import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.core.GroovyMap; -import org.apache.airavata.gfac.core.Script; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.JobSubmissionTask; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.AuroraUtils; -import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; -import org.apache.airavata.model.commons.ErrorModel; -import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.status.JobState; -import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; -import org.apache.airavata.model.task.TaskTypes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AuroraJobSubmission implements JobSubmissionTask{ - - private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmission.class); - - @Override - public JobStatus cancel(TaskContext taskcontext) throws TaskException { - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.ACTIVE); - return jobStatus; - } - - @Override - public void init(Map<String, String> propertyMap) throws TaskException { - - } - - @Override - public TaskStatus execute(TaskContext taskContext) { - TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. - ProcessContext processContext = taskContext.getParentProcessContext(); - JobModel jobModel = processContext.getJobModel(); - jobModel.setTaskId(taskContext.getTaskId()); - String jobIdAndName = "A" + GFacUtils.generateJobName(); - jobModel.setJobName(jobIdAndName); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.SUBMITTED); - - try { - JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName); - IdentityBean owner = new IdentityBean(AuroraUtils.ROLE); - GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext); - groovyMap.add(Script.JOB_SUBMITTER_COMMAND, "sh"); - String templateFileName = GFacUtils.getTemplateFileName(ResourceJobManagerType.CLOUD); - String script = GFacUtils.generateScript(groovyMap, templateFileName); - ProcessBean process_1 = new ProcessBean("process_1", script, false); - - Set<ProcessBean> processes = new LinkedHashSet<>(); - processes.add(process_1); - ResourceBean resources = new ResourceBean(1.5, 512, 512); - TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources); - JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER); - - String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig); - log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson); - - AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(); - ResponseBean response = client.createJob(jobConfig); - log.info("Response for job {}, {}", jobIdAndName, response); - jobModel.setJobDescription(resources.toString()); - - jobModel.setJobId(jobIdAndName); - jobStatus.setReason("Successfully Submitted"); - jobModel.setJobStatuses(Arrays.asList(jobStatus )); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - taskContext.getParentProcessContext().setJobModel(jobModel); - - GFacUtils.saveJobModel(processContext, jobModel); - GFacUtils.saveJobStatus(processContext, jobModel); - taskStatus.setReason("Successfully submitted job to Aurora"); - } catch (Throwable e) { - String msg = "Error occurred while submitting Aurora job"; - log.error(msg, e); - taskStatus.setState(TaskState.FAILED); - taskStatus.setReason(msg); - taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); - } - - taskContext.setTaskStatus(taskStatus); - try { - GFacUtils.saveAndPublishTaskStatus(taskContext); - } catch (GFacException e) { - log.error("Error while saving task status", e); - } - return taskStatus; - } - - @Override - public TaskStatus recover(TaskContext taskContext) { - return execute(taskContext); - } - - @Override - public TaskTypes getType() { - return TaskTypes.JOB_SUBMISSION; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/4d9c56e9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java new file mode 100644 index 0000000..a987559 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; +import org.apache.airavata.cloud.aurora.client.bean.IdentityBean; +import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean; +import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; +import org.apache.airavata.cloud.aurora.client.bean.ProcessBean; +import org.apache.airavata.cloud.aurora.client.bean.ResourceBean; +import org.apache.airavata.cloud.aurora.client.bean.ResponseBean; +import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean; +import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.GroovyMap; +import org.apache.airavata.gfac.core.Script; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.JobSubmissionTask; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.AuroraUtils; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AuroraJobSubmissionTask implements JobSubmissionTask{ + + private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmissionTask.class); + + @Override + public JobStatus cancel(TaskContext taskcontext) throws TaskException { + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.ACTIVE); + return jobStatus; + } + + @Override + public void init(Map<String, String> propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + jobModel.setTaskId(taskContext.getTaskId()); + String jobIdAndName = "A" + GFacUtils.generateJobName(); + jobModel.setJobName(jobIdAndName); + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.SUBMITTED); + + try { + JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName); + IdentityBean owner = new IdentityBean(AuroraUtils.ROLE); + GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext); + groovyMap.add(Script.JOB_SUBMITTER_COMMAND, "sh"); + String templateFileName = GFacUtils.getTemplateFileName(ResourceJobManagerType.CLOUD); + String script = GFacUtils.generateScript(groovyMap, templateFileName); + ProcessBean process_1 = new ProcessBean("process_1", script, false); + + Set<ProcessBean> processes = new LinkedHashSet<>(); + processes.add(process_1); + ResourceBean resources = new ResourceBean(1.5, 512, 512); + TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources); + JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER); + + String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig); + log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson); + + AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(); + ResponseBean response = client.createJob(jobConfig); + log.info("Response for job {}, {}", jobIdAndName, response); + jobModel.setJobDescription(resources.toString()); + + jobModel.setJobId(jobIdAndName); + jobStatus.setReason("Successfully Submitted"); + jobModel.setJobStatuses(Arrays.asList(jobStatus )); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskContext.getParentProcessContext().setJobModel(jobModel); + + GFacUtils.saveJobModel(processContext, jobModel); + GFacUtils.saveJobStatus(processContext, jobModel); + taskStatus.setReason("Successfully submitted job to Aurora"); + } catch (Throwable e) { + String msg = "Error occurred while submitting Aurora job"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); + } + + taskContext.setTaskStatus(taskStatus); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status", e); + } + return taskStatus; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + return execute(taskContext); + } + + @Override + public TaskTypes getType() { + return TaskTypes.JOB_SUBMISSION; + } +}
