Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 9a3617604 -> 9f0e45b25


Add implementation for ForkJobSubmission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f0e45b2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f0e45b2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f0e45b2

Branch: refs/heads/feature-workload-mgmt
Commit: 9f0e45b255a246981c0f9d1ad13e292b47b3a4a1
Parents: 9a36176
Author: Gourav Shenoy <[email protected]>
Authored: Tue May 2 12:52:22 2017 -0400
Committer: Gourav Shenoy <[email protected]>
Committed: Tue May 2 12:52:22 2017 -0400

----------------------------------------------------------------------
 .../impl/ForkJobSubmissionTask.java             | 42 +++++++++++---------
 .../jobsubmission/utils/JobSubmissionUtils.java |  2 +-
 2 files changed, 25 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9f0e45b2/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
index 9712de8..c200dfe 100644
--- 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/ForkJobSubmissionTask.java
@@ -23,13 +23,6 @@ package org.apache.airavata.worker.task.jobsubmission.impl;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
@@ -39,6 +32,19 @@ import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.worker.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.worker.core.cluster.RemoteCluster;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import 
org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.Script;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,15 +69,15 @@ public class ForkJobSubmissionTask implements 
JobSubmissionTask {
             JobModel jobModel = processContext.getJobModel();
             jobModel.setTaskId(taskContext.getTaskId());
             RemoteCluster remoteCluster = 
processContext.getJobSubmissionRemoteCluster();
-            GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, 
taskContext);
+            GroovyMap groovyMap = 
JobSubmissionUtils.createGroovyMap(processContext, taskContext);
             jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString());
-            ResourceJobManager resourceJobManager = 
GFacUtils.getResourceJobManager(processContext);
+            ResourceJobManager resourceJobManager = 
JobSubmissionUtils.getResourceJobManager(processContext);
             JobManagerConfiguration jConfig = null;
             if (resourceJobManager != null) {
-                jConfig = 
Factory.getJobManagerConfiguration(resourceJobManager);
+                jConfig = 
JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager);
             }
             JobStatus jobStatus = new JobStatus();
-               File jobFile = GFacUtils.createJobFile(groovyMap, taskContext, 
jConfig);
+               File jobFile = JobSubmissionUtils.createJobFile(groovyMap, 
taskContext, jConfig);
                if (jobFile != null && jobFile.exists()) {
                 
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
                    JobSubmissionOutput jobSubmissionOutput = 
remoteCluster.submitBatchJob(jobFile.getPath(),
@@ -82,13 +88,13 @@ public class ForkJobSubmissionTask implements 
JobSubmissionTask {
                    String jobId = jobSubmissionOutput.getJobId();
                    if (jobId != null && !jobId.isEmpty()) {
                     jobModel.setJobId(jobId);
-                    GFacUtils.saveJobModel(processContext, jobModel);
+                    JobSubmissionUtils.saveJobModel(processContext, jobModel);
                     jobStatus.setJobState(JobState.SUBMITTED);
                     jobStatus.setReason("Successfully Submitted to " + 
taskContext.getParentProcessContext()
                             .getComputeResourceDescription().getHostName());
                     
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                     jobModel.setJobStatuses(Arrays.asList(jobStatus));
-                    
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                    
WorkerUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
                     taskStatus = new TaskStatus(TaskState.COMPLETED);
                     taskStatus.setReason("Submitted job to compute resource");
                 }
@@ -100,13 +106,13 @@ public class ForkJobSubmissionTask implements 
JobSubmissionTask {
                     ErrorModel errorModel = new ErrorModel();
                     errorModel.setActualErrorMessage(msg);
                     
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-                    GFacUtils.saveExperimentError(processContext, errorModel);
-                    GFacUtils.saveProcessError(processContext, errorModel);
-                    GFacUtils.saveTaskError(taskContext, errorModel);
+                    WorkerUtils.saveExperimentError(processContext, 
errorModel);
+                    WorkerUtils.saveProcessError(processContext, errorModel);
+                    WorkerUtils.saveTaskError(taskContext, errorModel);
                     taskStatus.setState(TaskState.FAILED);
                     taskStatus.setReason("Couldn't find job id in both 
submitted and verified steps");
                 }else {
-                    GFacUtils.saveJobModel(processContext, jobModel);
+                    JobSubmissionUtils.saveJobModel(processContext, jobModel);
                 }
             } else {
                 taskStatus.setState(TaskState.FAILED);
@@ -134,7 +140,7 @@ public class ForkJobSubmissionTask implements 
JobSubmissionTask {
             errorModel.setActualErrorMessage(e.getMessage());
             errorModel.setUserFriendlyMessage(msg);
             
taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
-        } catch (GFacException e) {
+        } catch (WorkerException e) {
             String msg = "Error occurred while submitting the job";
             log.error(msg, e);
             taskStatus.setState(TaskState.FAILED);

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f0e45b2/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
index 68229d6..b4ebe1b 100644
--- 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -281,7 +281,7 @@ public class JobSubmissionUtils {
         }
     }
 
-    private static JobSubmissionInterface 
getPreferredJobSubmissionInterface(ProcessContext processContext) throws 
AppCatalogException {
+    public static JobSubmissionInterface 
getPreferredJobSubmissionInterface(ProcessContext processContext) throws 
AppCatalogException {
         try {
             String resourceHostId = 
processContext.getComputeResourceDescription().getComputeResourceId();
             JobSubmissionProtocol preferredJobSubmissionProtocol = 
processContext.getPreferredJobSubmissionProtocol();

Reply via email to