Repository: airavata Updated Branches: refs/heads/master 4a978d4f1 -> badaa732f
verify the job in remote resource after submit, if both failed then mark experiment as failed and exit. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/badaa732 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/badaa732 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/badaa732 Branch: refs/heads/master Commit: badaa732ff527a0a4012fe824cb53db83954d24b Parents: 4a978d4 Author: shamrath <[email protected]> Authored: Mon May 11 16:34:15 2015 -0400 Committer: shamrath <[email protected]> Committed: Mon May 11 16:34:15 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/utils/GFacUtils.java | 25 +++++++++++++++ .../gfac/ssh/provider/impl/SSHProvider.java | 32 +++++++++++++++++--- .../apache/airavata/gsi/ssh/api/Cluster.java | 8 +++++ .../ssh/api/job/JobManagerConfiguration.java | 2 ++ .../gsi/ssh/api/job/LSFJobConfiguration.java | 5 +++ .../gsi/ssh/api/job/LSFOutputParser.java | 20 ++++++++++++ .../airavata/gsi/ssh/api/job/OutputParser.java | 9 ++++++ .../gsi/ssh/api/job/PBSJobConfiguration.java | 6 ++++ .../gsi/ssh/api/job/PBSOutputParser.java | 20 ++++++++++++ .../gsi/ssh/api/job/SGEOutputParser.java | 6 ++++ .../gsi/ssh/api/job/SlurmJobConfiguration.java | 5 +++ .../gsi/ssh/api/job/SlurmOutputParser.java | 20 ++++++++++++ .../gsi/ssh/impl/GSISSHAbstractCluster.java | 10 ++++++ 13 files changed, 163 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 1e9e212..5bcd75c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -41,9 +41,12 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.airavata.registry.cpi.CompositeIdentifier; import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.RegistryModelType; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -1465,4 +1468,26 @@ public class GFacUtils { buffer.flip();//need flip return buffer.getLong(); } + + public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException { + Registry airavataRegistry = RegistryFactory.getDefaultRegistry(); + Experiment details = (Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId); + if (details == null) { + details = new Experiment(); + details.setExperimentID(experimentId); + } + org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus(); + status.setExperimentState(state); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) && + !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) { + status.setExperimentState(state); + } else { + status.setExperimentState(details.getExperimentStatus().getExperimentState()); + } + details.setExperimentStatus(status); + log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString()); + airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + return details.getExperimentStatus().getExperimentState(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 0f88327..2a3287b 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -56,6 +56,7 @@ import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerTy import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.xmlbeans.XmlException; @@ -167,14 +168,24 @@ public class SSHProvider extends AbstractProvider { jobDetails.setJobDescription(jobDescriptor.toXML()); String jobID = cluster.submitBatchJob(jobDescriptor); + if (jobID != null) { + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); + } jobExecutionContext.setJobDetails(jobDetails); + String verifyJobId = verifyJobSubmission(cluster, jobDetails); + if (verifyJobId != null) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); + if (jobID == null) { + jobID = verifyJobId; + } + } if (jobID == null) { - jobDetails.setJobID("none"); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); - } else { - jobDetails.setJobID(jobID); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); + log.error("Couldn't find remote jobId for JobName:" + jobDetails.getJobName() + ", ExperimentId:" + jobExecutionContext.getExperimentID()); + GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED); + return; } + jobDetails.setJobID(jobID); data.append("jobDesc=").append(jobDescriptor.toXML()); data.append(",jobId=").append(jobDetails.getJobID()); delegateToMonitorHandlers(jobExecutionContext); @@ -203,6 +214,17 @@ public class SSHProvider extends AbstractProvider { } } + private String verifyJobSubmission(Cluster cluster, JobDetails jobDetails) { + String jobName = jobDetails.getJobName(); + String jobId = null; + try { + jobId = cluster.getJobIdByJobName(jobName, cluster.getServerInfo().getUserName()); + } catch (SSHApiException e) { + log.error("Error while verifying JobId from JobName"); + } + return jobId; + } + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java index ed4b3b4..34e3b94 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java @@ -113,6 +113,14 @@ public interface Cluster { * @throws SSHApiException throws exception during error */ public JobStatus getJobStatus(String jobID) throws SSHApiException; + /** + * This will get the job status of the the job associated with this jobId + * + * @param jobName jobName of the job user want to get the status + * @return jobId of the given jobName + * @throws SSHApiException throws exception during error + */ + public String getJobIdByJobName(String jobName, String userName) throws SSHApiException; /** * This method can be used to poll the jobstatuses based on the given http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java index 85a843e..d9b6b1c 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java @@ -32,6 +32,8 @@ public interface JobManagerConfiguration { public RawCommandInfo getUserBasedMonitorCommand(String userName); + public RawCommandInfo getJobIdMonitorCommand(String jobName , String userName); + public String getScriptExtension(); public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath); http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java index 46fe9ad..740c9ac 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFJobConfiguration.java @@ -74,6 +74,11 @@ public class LSFJobConfiguration implements JobManagerConfiguration { } @Override + public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) { + return new RawCommandInfo(this.installedPath + "bjobs -J " + jobName); + } + + @Override public String getScriptExtension() { return scriptExtension; } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java index 71c3339..a621ae0 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/LSFOutputParser.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class LSFOutputParser implements OutputParser { private final static Logger logger = LoggerFactory.getLogger(LSFOutputParser.class); @@ -90,6 +92,24 @@ public class LSFOutputParser implements OutputParser { } } + @Override + public String parseJobId(String jobName, String rawOutput) throws SSHApiException { + String regJobId = "jobId"; + Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match + if (rawOutput != null) { + Matcher matcher = pattern.matcher(rawOutput); + if (matcher.find()) { + return matcher.group(regJobId); + } else { + logger.error("No match is found for JobName"); + return null; + } + } else { + logger.error("Error: RawOutput shouldn't be null"); + return null; + } + } + public static void main(String[] args) { String test = "Job <2477982> is submitted to queue <short>."; System.out.println(test.substring(test.indexOf("<")+1, test.indexOf(">"))); http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java index b6f5f0a..fd37b6a 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java @@ -56,4 +56,13 @@ public interface OutputParser { * @param rawOutput */ public void parseJobStatuses(String userName, Map<String, JobStatus> statusMap, String rawOutput)throws SSHApiException; + + /** + * filter the jobId value of given JobName from rawOutput + * @param jobName + * @param rawOutput + * @return + * @throws SSHApiException + */ + public String parseJobId(String jobName, String rawOutput) throws SSHApiException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java index e935dfb..b658b16 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java @@ -97,6 +97,12 @@ public class PBSJobConfiguration implements JobManagerConfiguration { } @Override + public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) { + // For PBS there is no option to get jobDetails by JobName, so we search with userName + return new RawCommandInfo(this.installedPath + "qstat -u " + userName); + } + + @Override public String getBaseCancelCommand() { return "qdel"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java index 8f2a606..3304465 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java @@ -30,6 +30,8 @@ import javax.validation.constraints.Null; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class PBSOutputParser implements OutputParser { private static final Logger log = LoggerFactory.getLogger(PBSOutputParser.class); @@ -190,5 +192,23 @@ public class PBSOutputParser implements OutputParser { } } + @Override + public String parseJobId(String jobName, String rawOutput) throws SSHApiException { + String regJobId = "jobId"; + Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match + if (rawOutput != null) { + Matcher matcher = pattern.matcher(rawOutput); + if (matcher.find()) { + return matcher.group(regJobId); + } else { + log.error("No match is found for JobName"); + return null; + } + } else { + log.error("Error: RawOutput shouldn't be null"); + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java index 3fb5874..884b7f3 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SGEOutputParser.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.gsi.ssh.api.job; +import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.impl.JobStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,5 +164,10 @@ public class SGEOutputParser implements OutputParser{ } } + @Override + public String parseJobId(String jobName, String rawOutput) throws SSHApiException { + return null; // TODO: Implement the parse logic ( with regex if possible ). + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java index 807ac42..a74b35d 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java @@ -96,6 +96,11 @@ public class SlurmJobConfiguration implements JobManagerConfiguration{ } @Override + public RawCommandInfo getJobIdMonitorCommand(String jobName, String userName) { + return new RawCommandInfo(this.installedPath + "squeue -n " + jobName); + } + + @Override public String getBaseCancelCommand() { return "scancel"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java index 44a1068..64691dc 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java @@ -29,6 +29,8 @@ import javax.print.attribute.standard.JobState; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class SlurmOutputParser implements OutputParser { private static final Logger log = LoggerFactory.getLogger(SlurmOutputParser.class); @@ -187,4 +189,22 @@ public class SlurmOutputParser implements OutputParser { } } } + + @Override + public String parseJobId(String jobName, String rawOutput) throws SSHApiException { + String regJobId = "jobId"; + Pattern pattern = Pattern.compile("(?=(?<" + regJobId + ">\\d+)\\s+\\w+\\s+" + jobName + ")"); // regex - look ahead and match + if (rawOutput != null) { + Matcher matcher = pattern.matcher(rawOutput); + if (matcher.find()) { + return matcher.group(regJobId); + } else { + log.error("No match is found for JobName"); + return null; + } + } else { + log.error("Error: RawOutput shouldn't be null"); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/badaa732/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java index 3d18013..9357706 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java @@ -425,6 +425,16 @@ public class GSISSHAbstractCluster implements Cluster { return jobManagerConfiguration.getParser().parseJobStatus(jobID, result); } + @Override + public String getJobIdByJobName(String jobName, String userName) throws SSHApiException { + RawCommandInfo rawCommandInfo = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName); + StandardOutReader stdOutReader = new StandardOutReader(); + CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader); + String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !", + jobManagerConfiguration.getJobIdMonitorCommand(jobName,userName).getCommand()); + return jobManagerConfiguration.getParser().parseJobId(jobName, result); + } + private static void logDebug(String message) { if (log.isDebugEnabled()) { log.debug(message);
