Repository: airavata
Updated Branches:
  refs/heads/master 4792eac6e -> 843940fa0


Idetify job submission failures using output parsers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/843940fa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/843940fa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/843940fa

Branch: refs/heads/master
Commit: 843940fa0bdde4e5cca397e8acdf38125c92ee16
Parents: 4792eac
Author: Shameera Rathnayaka <[email protected]>
Authored: Fri Jan 8 16:03:26 2016 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Fri Jan 8 16:03:26 2016 -0500

----------------------------------------------------------------------
 .../gfac/core/cluster/JobSubmissionOutput.java  |  18 +++
 .../gfac/core/cluster/OutputParser.java         |   8 ++
 .../airavata/gfac/impl/HPCRemoteCluster.java    |   6 +
 .../gfac/impl/job/ForkOutputParser.java         |   5 +
 .../airavata/gfac/impl/job/LSFOutputParser.java |   5 +
 .../airavata/gfac/impl/job/PBSOutputParser.java |   5 +
 .../gfac/impl/job/SlurmOutputParser.java        |   7 ++
 .../airavata/gfac/impl/job/UGEOutputParser.java |   5 +
 .../impl/task/DefaultJobSubmissionTask.java     | 112 +++++++++++--------
 9 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
index 6632ab5..d912409 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
@@ -27,6 +27,8 @@ public class JobSubmissionOutput {
        private String stdErr;
        private String command;
        private String jobId;
+       private boolean isJobSubmissionFailed;
+       private String failureReason;
 
        public int getExitCode() {
                return exitCode;
@@ -67,4 +69,20 @@ public class JobSubmissionOutput {
        public void setJobId(String jobId) {
                this.jobId = jobId;
        }
+
+       public boolean isJobSubmissionFailed() {
+               return isJobSubmissionFailed;
+       }
+
+       public void setJobSubmissionFailed(boolean jobSubmissionFailed) {
+               isJobSubmissionFailed = jobSubmissionFailed;
+       }
+
+       public String getFailureReason() {
+               return failureReason;
+       }
+
+       public void setFailureReason(String failureReason) {
+               this.failureReason = failureReason;
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
index 521e23f..18de355 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/OutputParser.java
@@ -44,6 +44,14 @@ public interface OutputParser {
 
 
     /**
+     * Parse output return by job submission task and identify jobSubmission 
failures.
+     * @param rawOutput
+     * @return true if job submission has been failed, false otherwise.
+     */
+    public boolean isJobSubmissionFailed(String rawOutput);
+
+
+    /**
      * This can be used to get the job status from the output
      * @param jobID
      * @param rawOutput

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 8c4a4c0..022c8bc 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -91,6 +91,12 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
                executeCommand(submitCommand, reader);
 //             throwExceptionOnError(reader, submitCommand);
                
jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString()));
+               if (jsoutput.getJobId() == null) {
+                       if 
(outputParser.isJobSubmissionFailed(reader.getStdOutputString())) {
+                               jsoutput.setJobSubmissionFailed(true);
+                               
jsoutput.setFailureReason(reader.getStdOutputString());
+                       }
+               }
                jsoutput.setExitCode(reader.getExitCode());
                jsoutput.setStdOut(reader.getStdOutputString());
                jsoutput.setStdErr(reader.getStdErrorString());

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
index 72856e5..b99db30 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/ForkOutputParser.java
@@ -44,6 +44,11 @@ public class ForkOutputParser implements OutputParser {
     }
 
     @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws 
SSHApiException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
index b16aa9b..bb0ae46 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/LSFOutputParser.java
@@ -52,6 +52,11 @@ public class LSFOutputParser implements OutputParser {
     }
 
     @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
+    @Override
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws 
SSHApiException {
         boolean jobFount = false;
         logger.debug(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
index f187724..7f97a68 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/PBSOutputParser.java
@@ -135,6 +135,11 @@ public class PBSOutputParser implements OutputParser {
         return jobId;  //In PBS stdout is going to be directly the jobID
     }
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) {
         boolean jobFount = false;
         log.debug(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
index fecb5e7..dff0a9b 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/SlurmOutputParser.java
@@ -115,6 +115,13 @@ public class SlurmOutputParser implements OutputParser {
            return "";
     }
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        Pattern pattern = Pattern.compile("FAILED");
+        Matcher matcher = pattern.matcher(rawOutput);
+        return matcher.find();
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) throws 
SSHApiException {
         log.info(rawOutput);
         Pattern pattern = Pattern.compile(jobID + 
"(?=\\s+\\S+\\s+\\S+\\s+\\S+\\s+(?<" + STATUS + ">\\w+))");

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
index 0ece2d9..f19d4f7 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/job/UGEOutputParser.java
@@ -135,6 +135,11 @@ public class UGEOutputParser implements OutputParser {
                }
        }
 
+    @Override
+    public boolean isJobSubmissionFailed(String rawOutput) {
+        return false;
+    }
+
     public JobStatus parseJobStatus(String jobID, String rawOutput) {
         Pattern pattern = Pattern.compile("job_number:[\\s]+" + jobID);
         Matcher matcher = pattern.matcher(rawOutput);

http://git-wip-us.apache.org/repos/asf/airavata/blob/843940fa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 68d3bac..ebdda13 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -82,40 +82,56 @@ public class DefaultJobSubmissionTask implements 
JobSubmissionTask {
                                
jobModel.setStdErr(jobSubmissionOutput.getStdErr());
                                
jobModel.setStdOut(jobSubmissionOutput.getStdOut());
                                String jobId = jobSubmissionOutput.getJobId();
-                               if (exitCode != 0 && jobId == null) {
+                               if (exitCode != 0 || 
jobSubmissionOutput.isJobSubmissionFailed()) {
                                        jobModel.setJobId(DEFAULT_JOB_ID);
-                                       GFacUtils.saveJobModel(processContext, 
jobModel);
-                                       String msg;
-                                       if (exitCode != Integer.MIN_VALUE) {
-                                               msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + ", processId:" +
-                                                               
processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
-                                                               " return non 
zero exit code:" + exitCode + "  for JobName:" + jobModel.getJobName() +
-                                                               ", Hence 
changing job state to Failed";
+                                       if 
(jobSubmissionOutput.isJobSubmissionFailed()) {
+                                               jobModel.setJobStatus(new 
JobStatus(JobState.FAILED));
+                                               
jobModel.getJobStatus().setReason(jobSubmissionOutput.getFailureReason());
+                                               log.error("expId: {}, 
processid: {}, taskId: {} :- Job submission failed for job name {}",
+                                                               
taskContext.getExperimentId(), taskContext.getProcessId(), 
taskContext.getTaskId(), jobModel.getJobName());
+                                               ErrorModel errorModel = new 
ErrorModel();
+                                               
errorModel.setUserFriendlyMessage(jobSubmissionOutput.getFailureReason());
+                                               
errorModel.setActualErrorMessage(jobSubmissionOutput.getFailureReason());
+                                               
GFacUtils.saveExperimentError(processContext, errorModel);
+                                               
GFacUtils.saveProcessError(processContext, errorModel);
+                                               
GFacUtils.saveTaskError(taskContext, errorModel);
+                                               
taskStatus.setState(TaskState.FAILED);
+                                               taskStatus.setReason("Job 
submission command exit with non zero exit code");
+                                               
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                               
taskContext.setTaskStatus(taskStatus);
                                        } else {
-                                               msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + ", processId:" +
-                                                               
processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
-                                                               " doesn't  
return valid job submission exit code for JobName:" + jobModel.getJobName() +
-                                                               ", Hence 
changing job state to Failed";
+                                               String msg;
+                                               
GFacUtils.saveJobModel(processContext, jobModel);
+                                               if (exitCode != 
Integer.MIN_VALUE) {
+                                                       msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + ", processId:" +
+                                                                       
processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+                                                                       " 
return non zero exit code:" + exitCode + "  for JobName:" + 
jobModel.getJobName() +
+                                                                       ", 
Hence changing job state to Failed";
+                                               } else {
+                                                       msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + ", processId:" +
+                                                                       
processContext.getProcessId() + ", taskId: " + taskContext.getTaskId() +
+                                                                       " 
doesn't  return valid job submission exit code for JobName:" + 
jobModel.getJobName() +
+                                                                       ", 
Hence changing job state to Failed";
+                                               }
+                                               log.error(msg);
+                                               ErrorModel errorModel = new 
ErrorModel();
+                                               
errorModel.setUserFriendlyMessage(msg);
+                                               
errorModel.setActualErrorMessage(msg);
+                                               
GFacUtils.saveExperimentError(processContext, errorModel);
+                                               
GFacUtils.saveProcessError(processContext, errorModel);
+                                               
GFacUtils.saveTaskError(taskContext, errorModel);
+                                               
taskStatus.setState(TaskState.FAILED);
+                                               taskStatus.setReason("Job 
submission command exit with non zero exit code");
+                                               
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                               
taskContext.setTaskStatus(taskStatus);
                                        }
-                                       log.error(msg);
-                                       ErrorModel errorModel = new 
ErrorModel();
-                                       errorModel.setUserFriendlyMessage(msg);
-                                       errorModel.setActualErrorMessage(msg);
-                                       
GFacUtils.saveExperimentError(processContext, errorModel);
-                                       
GFacUtils.saveProcessError(processContext, errorModel);
-                                       GFacUtils.saveTaskError(taskContext, 
errorModel);
-                                       taskStatus.setState(TaskState.FAILED);
-                                       taskStatus.setReason("Job submission 
command exit with non zero exit code");
-                                       
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                                       taskContext.setTaskStatus(taskStatus);
                                        try {
                                                
GFacUtils.saveAndPublishTaskStatus(taskContext);
                                        } catch (GFacException e) {
                                                log.error("Error while saving 
task status", e);
                                        }
                                        return taskStatus;
-                               }
-                           if (jobId != null && !jobId.isEmpty()) {
+                               } else if (jobId != null && !jobId.isEmpty()) {
                                    jobModel.setJobId(jobId);
                                    GFacUtils.saveJobModel(processContext, 
jobModel);
                                    jobStatus.setJobState(JobState.SUBMITTED);
@@ -134,29 +150,29 @@ public class DefaultJobSubmissionTask implements 
JobSubmissionTask {
                                    taskStatus = new 
TaskStatus(TaskState.COMPLETED);
                                    taskStatus.setReason("Submitted job to 
compute resource");
                     
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                           } else {
-                                   int verificationTryCount = 0;
-                                   while (verificationTryCount++ < 3) {
-                                           String verifyJobId = 
verifyJobSubmission(remoteCluster, jobModel);
-                                           if (verifyJobId != null && 
!verifyJobId.isEmpty()) {
-                                                   // JobStatus either changed 
from SUBMITTED to QUEUED or directly to QUEUED
-                                                   jobId = verifyJobId;
-                                                   jobModel.setJobId(jobId);
-                                                   
GFacUtils.saveJobModel(processContext,jobModel);
-                                                   
jobStatus.setJobState(JobState.QUEUED);
-                                                   
jobStatus.setReason("Verification step succeeded");
-                            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                                                   
jobModel.setJobStatus(jobStatus);
-                                                   
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
-                                                   
taskStatus.setState(TaskState.COMPLETED);
-                                                   
taskStatus.setReason("Submitted job to compute resource");
-                            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                                                   break;
-                                           }
-                                           log.info("Verify step return 
invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
-                                           Thread.sleep(verificationTryCount * 
10000);
-                                   }
-                           }
+                               } else {
+                                       int verificationTryCount = 0;
+                                       while (verificationTryCount++ < 3) {
+                                               String verifyJobId = 
verifyJobSubmission(remoteCluster, jobModel);
+                                               if (verifyJobId != null && 
!verifyJobId.isEmpty()) {
+                                                       // JobStatus either 
changed from SUBMITTED to QUEUED or directly to QUEUED
+                                                       jobId = verifyJobId;
+                                                       
jobModel.setJobId(jobId);
+                                                       
GFacUtils.saveJobModel(processContext, jobModel);
+                                                       
jobStatus.setJobState(JobState.QUEUED);
+                                                       
jobStatus.setReason("Verification step succeeded");
+                                                       
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                                       
jobModel.setJobStatus(jobStatus);
+                                                       
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                                                       
taskStatus.setState(TaskState.COMPLETED);
+                                                       
taskStatus.setReason("Submitted job to compute resource");
+                                                       
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                                       break;
+                                               }
+                                               log.info("Verify step return 
invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+                                               
Thread.sleep(verificationTryCount * 10000);
+                                       }
+                               }
 
                            if (jobId == null || jobId.isEmpty()) {
                                        jobModel.setJobId(DEFAULT_JOB_ID);

Reply via email to