Repository: airavata Updated Branches: refs/heads/feature-workload-mgmt 702aa50ae -> 9a3617604
Add implementation for LocalJobSubmission Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9a361760 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9a361760 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9a361760 Branch: refs/heads/feature-workload-mgmt Commit: 9a361760406c1e3c2bf5862d77ad6a0cff139fb3 Parents: 702aa50 Author: Gourav Shenoy <[email protected]> Authored: Tue May 2 12:12:22 2017 -0400 Committer: Gourav Shenoy <[email protected]> Committed: Tue May 2 12:12:22 2017 -0400 ---------------------------------------------------------------------- .../impl/LocalJobSubmissionTask.java | 38 +++++++++++--------- 1 file changed, 22 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/9a361760/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java index 8bd73a4..804fce2 100644 --- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/LocalJobSubmissionTask.java @@ -23,13 +23,6 @@ package org.apache.airavata.worker.task.jobsubmission.impl; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput; -import org.apache.airavata.gfac.core.cluster.RemoteCluster; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.JobSubmissionTask; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; @@ -42,6 +35,19 @@ import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.worker.core.cluster.JobSubmissionOutput; +import org.apache.airavata.worker.core.cluster.RemoteCluster; +import org.apache.airavata.worker.core.context.ProcessContext; +import org.apache.airavata.worker.core.context.TaskContext; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.apache.airavata.worker.core.task.TaskException; +import org.apache.airavata.worker.core.utils.JobManagerConfiguration; +import org.apache.airavata.worker.core.utils.WorkerUtils; +import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask; +import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap; +import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory; +import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils; +import org.apache.airavata.worker.task.jobsubmission.utils.Script; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +56,7 @@ import java.io.File; import java.io.IOException; import java.util.*; -public class LocalJobSubmissionTask implements JobSubmissionTask{ +public class LocalJobSubmissionTask implements JobSubmissionTask { private static final Logger log = LoggerFactory.getLogger(LocalJobSubmissionTask.class); private ProcessBuilder builder; @@ -67,25 +73,25 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ jobModel.setTaskId(taskContext.getTaskId()); RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster(); - GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext,taskContext); + GroovyMap groovyMap = JobSubmissionUtils.createGroovyMap(processContext,taskContext); String jobId = AiravataUtils.getId("JOB_ID_"); jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString()); jobModel.setJobId(jobId); - ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); + ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext); JobManagerConfiguration jConfig = null; if (resourceJobManager != null) { - jConfig = Factory.getJobManagerConfiguration(resourceJobManager); + jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager); } JobStatus jobStatus = new JobStatus(); - File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, jConfig); + File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig); if (jobFile != null && jobFile.exists()) { jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - GFacUtils.saveJobModel(processContext, jobModel); + JobSubmissionUtils.saveJobModel(processContext, jobModel); JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir()); @@ -96,7 +102,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); //log job submit status - GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); //for local, job gets completed synchronously //so changing job status to complete @@ -113,7 +119,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); //log job complete status - GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); + WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel); taskStatus = new TaskStatus(TaskState.COMPLETED); @@ -128,7 +134,7 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ } } - } catch (GFacException | IOException | AppCatalogException | ApplicationSettingsException e) { + } catch (WorkerException | IOException | AppCatalogException | ApplicationSettingsException e) { String msg = "Error occurred while submitting a local job"; log.error(msg, e); taskStatus.setReason(msg);
