Repository: airavata Updated Branches: refs/heads/airavata-0.15-release-branch e22277db1 -> df5b46459
To fix job monitoring for blacklight. AIRAVATA-1716 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f4d8fb9a Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f4d8fb9a Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f4d8fb9a Branch: refs/heads/airavata-0.15-release-branch Commit: f4d8fb9a1096a2c64741961387ed91f43b1ef058 Parents: 6b0f5da Author: raminder <[email protected]> Authored: Mon Jun 8 13:40:44 2015 -0400 Committer: raminder <[email protected]> Committed: Mon Jun 8 13:40:44 2015 -0400 ---------------------------------------------------------------------- .../gfac/ssh/provider/impl/SSHProvider.java | 52 +++++++++++--------- .../gsi/ssh/api/job/PBSOutputParser.java | 3 -- .../gsi/ssh/impl/GSISSHAbstractCluster.java | 9 ++++ 3 files changed, 39 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/f4d8fb9a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index a9eaae2..27708c7 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -21,6 +21,18 @@ package org.apache.airavata.gfac.ssh.provider.impl; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -60,18 +72,14 @@ import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerTy import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.xmlbeans.XmlException; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; -import java.io.*; -import java.util.*; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * Execute application using remote SSH @@ -170,27 +178,27 @@ public class SSHProvider extends AbstractProvider { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.JOBSUBMITTED)); jobExecutionContext.setJobDetails(jobDetails); - if (verifyJobSubmissionByJobId(cluster, jobID)) { + try { + boolean verifyJobSubmissionByJobId = verifyJobSubmissionByJobId(cluster, jobID); + if (verifyJobSubmissionByJobId) { + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher); + } + } catch (SSHApiException e) { + log.error("Not able to find job " + jobID , e); + } + } else { + jobExecutionContext.setJobDetails(jobDetails); + String verifyJobId = verifyJobSubmission(cluster, jobDetails); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobID = verifyJobId; + jobDetails.setJobID(jobID); monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.JOBSUBMITTED)); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher); } - } else { - jobExecutionContext.setJobDetails(jobDetails); - int verificationTryCount = 0; - while (verificationTryCount++ < 3) { - String verifyJobId = verifyJobSubmission(cluster, jobDetails); - if (verifyJobId != null && !verifyJobId.isEmpty()) { - // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED - jobID = verifyJobId; - jobDetails.setJobID(jobID); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) - , GfacExperimentState.JOBSUBMITTED)); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher); - break; - } - Thread.sleep(verificationTryCount*1000); - } } if (jobID == null || jobID.isEmpty()) { http://git-wip-us.apache.org/repos/asf/airavata/blob/f4d8fb9a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java index 6bc5cde..81f241d 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java @@ -124,9 +124,6 @@ public class PBSOutputParser implements OutputParser { public String parseJobSubmission(String rawOutput) { log.debug(rawOutput); - if(rawOutput.contains(".")){ - rawOutput = rawOutput.substring(0, rawOutput.indexOf(".")); - } return rawOutput; //In PBS stdout is going to be directly the jobID } http://git-wip-us.apache.org/repos/asf/airavata/blob/f4d8fb9a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java index f8b46c8..a858ada 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java @@ -242,6 +242,9 @@ public class GSISSHAbstractCluster implements Cluster { log.info("Validation before cancel is failed, couldn't found job in remote host to cancel. Job may be already completed|failed|canceled"); return null; } + if(jobID.contains(".")){ + jobID = jobID.substring(0, jobID.indexOf(".")); + } RawCommandInfo rawCommandInfo = jobManagerConfiguration.getCancelCommand(jobID); StandardOutReader stdOutReader = new StandardOutReader(); @@ -408,6 +411,9 @@ public class GSISSHAbstractCluster implements Cluster { public synchronized JobDescriptor getJobDescriptorById(String jobID) throws SSHApiException { + if(jobID.contains(".")){ + jobID = jobID.substring(0, jobID.indexOf(".")); + } RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID); StandardOutReader stdOutReader = new StandardOutReader(); log.info("Executing RawCommand : " + rawCommandInfo.getCommand()); @@ -419,6 +425,9 @@ public class GSISSHAbstractCluster implements Cluster { } public synchronized JobStatus getJobStatus(String jobID) throws SSHApiException { + if(jobID.contains(".")){ + jobID = jobID.substring(0, jobID.indexOf(".")); + } RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID); StandardOutReader stdOutReader = new StandardOutReader(); log.info("Executing RawCommand : " + rawCommandInfo.getCommand());
