Repository: airavata Updated Branches: refs/heads/feature-workload-mgmt 9a3617604 -> 9f0e45b25
Add implementation for ForkJobSubmission Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f0e45b2 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f0e45b2 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f0e45b2 Branch: refs/heads/feature-workload-mgmt Commit: 9f0e45b255a246981c0f9d1ad13e292b47b3a4a1 Parents: 9a36176 Author: Gourav Shenoy <[email protected]> Authored: Tue May 2 12:52:22 2017 -0400 Committer: Gourav Shenoy <[email protected]> Committed: Tue May 2 12:52:22 2017 -0400 ---------------------------------------------------------------------- .../impl/ForkJobSubmissionTask.java | 42 +++++++++++--------- .../jobsubmission/utils/JobSubmissionUtils.java | 2 +- 2 files changed, 25 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9f0e45b2/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java index 9712de8..c200dfe 100644 --- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java @@ -23,13 +23,6 @@ package org.apache.airavata.worker.task.jobsubmission.impl; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput; -import org.apache.airavata.gfac.core.cluster.RemoteCluster; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.JobSubmissionTask; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.job.JobModel; @@ -39,6 +32,19 @@ import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.worker.core.cluster.JobSubmissionOutput; +import org.apache.airavata.worker.core.cluster.RemoteCluster; +import org.apache.airavata.worker.core.context.ProcessContext; +import org.apache.airavata.worker.core.context.TaskContext; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.apache.airavata.worker.core.task.TaskException; +import org.apache.airavata.worker.core.utils.JobManagerConfiguration; +import org.apache.airavata.worker.core.utils.WorkerUtils; +import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask; +import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap; +import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory; +import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils; +import org.apache.airavata.worker.task.jobsubmission.utils.Script; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,15 +69,15 @@ public class ForkJobSubmissionTask implements JobSubmissionTask { JobModel jobModel = processContext.getJobModel(); jobModel.setTaskId(taskContext.getTaskId()); RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster(); - GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext); + GroovyMap groovyMap = JobSubmissionUtils.createGroovyMap(processContext, taskContext); jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString()); - ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); + ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext); JobManagerConfiguration jConfig = null; if (resourceJobManager != null) { - jConfig = Factory.getJobManagerConfiguration(resourceJobManager); + jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager); } JobStatus jobStatus = new JobStatus(); - File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig); + File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig); if (jobFile != null && jobFile.exists()) { jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(), @@ -82,13 +88,13 @@ public class ForkJobSubmissionTask implements JobSubmissionTask { String jobId = jobSubmissionOutput.getJobId(); if (jobId != null && !jobId.isEmpty()) { jobModel.setJobId(jobId); - GFacUtils.saveJobModel(processContext, jobModel); + JobSubmissionUtils.saveJobModel(processContext, jobModel); jobStatus.setJobState(JobState.SUBMITTED); jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext() .getComputeResourceDescription().getHostName()); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); taskStatus = new TaskStatus(TaskState.COMPLETED); taskStatus.setReason("Submitted job to compute resource"); } @@ -100,13 +106,13 @@ public class ForkJobSubmissionTask implements JobSubmissionTask { ErrorModel errorModel = new ErrorModel(); errorModel.setActualErrorMessage(msg); errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); - GFacUtils.saveExperimentError(processContext, errorModel); - GFacUtils.saveProcessError(processContext, errorModel); - GFacUtils.saveTaskError(taskContext, errorModel); + WorkerUtils.saveExperimentError(processContext, errorModel); + WorkerUtils.saveProcessError(processContext, errorModel); + WorkerUtils.saveTaskError(taskContext, errorModel); taskStatus.setState(TaskState.FAILED); taskStatus.setReason("Couldn't find job id in both submitted and verified steps"); }else { - GFacUtils.saveJobModel(processContext, jobModel); + JobSubmissionUtils.saveJobModel(processContext, jobModel); } } else { taskStatus.setState(TaskState.FAILED); @@ -134,7 +140,7 @@ public class ForkJobSubmissionTask implements JobSubmissionTask { errorModel.setActualErrorMessage(e.getMessage()); errorModel.setUserFriendlyMessage(msg); taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); - } catch (GFacException e) { + } catch (WorkerException e) { String msg = "Error occurred while submitting the job"; log.error(msg, e); taskStatus.setState(TaskState.FAILED); http://git-wip-us.apache.org/repos/asf/airavata/blob/9f0e45b2/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java index 68229d6..b4ebe1b 100644 --- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java @@ -281,7 +281,7 @@ public class JobSubmissionUtils { } } - private static JobSubmissionInterface getPreferredJobSubmissionInterface(ProcessContext processContext) throws AppCatalogException { + public static JobSubmissionInterface getPreferredJobSubmissionInterface(ProcessContext processContext) throws AppCatalogException { try { String resourceHostId = processContext.getComputeResourceDescription().getComputeResourceId(); JobSubmissionProtocol preferredJobSubmissionProtocol = processContext.getPreferredJobSubmissionProtocol();
