Repository: airavata Updated Branches: refs/heads/master b02308493 -> 34a8147e4
fixing monitoring issue with async submission Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34a8147e Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34a8147e Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34a8147e Branch: refs/heads/master Commit: 34a8147e4a6980d6fbbc59f7a8440d6797da9e1a Parents: b023084 Author: lahiru <[email protected]> Authored: Wed Apr 23 16:47:21 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Apr 23 16:47:21 2014 -0400 ---------------------------------------------------------------------- modules/distribution/server/pom.xml | 5 + .../server/src/main/assembly/bin-assembly.xml | 1 + .../java/org/apache/airavata/gfac/cpi/GFac.java | 5 +- .../org/apache/airavata/gfac/cpi/GFacImpl.java | 34 +-- .../apache/airavata/gfac/monitor/MonitorID.java | 13 +- .../airavata/gfac/handler/SCPOutputHandler.java | 98 +++++--- .../core/impl/EmbeddedGFACJobSubmitter.java | 8 +- .../orchestrator/core/job/JobSubmitter.java | 10 +- .../cpi/impl/SimpleOrchestratorImpl.java | 17 +- .../apache/airavata/job/monitor/MonitorID.java | 221 ------------------- tools/pom.xml | 1 - 11 files changed, 121 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/pom.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml index 0ff7f4e..ad6d07c 100644 --- a/modules/distribution/server/pom.xml +++ b/modules/distribution/server/pom.xml @@ -286,6 +286,11 @@ </dependency> <dependency> <groupId>org.apache.airavata</groupId> + <artifactId>airavata-gfac-ssh</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> <artifactId>airavata-gfac-core</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/distribution/server/src/main/assembly/bin-assembly.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml index 05f4475..4a76cd2 100644 --- a/modules/distribution/server/src/main/assembly/bin-assembly.xml +++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml @@ -196,6 +196,7 @@ <include>org.apache.airavata:airavata-data-models:jar</include> <include>org.apache.airavata:airavata-credential-store:jar</include> <include>org.apache.airavata:airavata-gfac-core:jar</include> + <include>org.apache.airavata:airavata-gfac-ssh:jar</include> <include>org.apache.airavata:airavata-client-api:jar</include> <include>org.apache.airavata:airavata-message-monitor:jar</include> <include>org.apache.airavata:airavata-workflow-model-core:jar</include> http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java index d18df07..fc5fd19 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java @@ -41,10 +41,9 @@ public interface GFac { /** * This method has to be invoked after submitting the job and have to make sure job is properly finished - * @param experimentID - * @param taskID + * @param jobExecutionContext * @throws GFacException */ - public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException; + public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java index 14ea519..6312292 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java @@ -309,30 +309,20 @@ public class GFacImpl implements GFac, AbstractActivityListener { } } - public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException { - JobExecutionContext jobExecutionContext = null; - try { - jobExecutionContext = createJEC(experimentID, taskID); - Scheduler.schedule(jobExecutionContext); - ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); - TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID); - JobDetails jobDetails = taskData.getJobDetailsList().get(0); - String jobDescription = jobDetails.getJobDescription(); - if(jobDescription != null) { - JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDescription); - applicationDeploymentDescription.getType().setScratchWorkingDirectory( - jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); - applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); - applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); - applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); - applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); - } + public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { + GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration(); + List<GFacHandlerConfig> handlers = null; + if(gFacConfiguration != null){ + handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers(); + }else { + try { + jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()); } catch (Exception e) { - throw new GFacException(e); + log.error("Error constructing job execution context during outhandler invocation"); + throw new GFacException(e); + } + schedule(jobExecutionContext); } - - List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers(); - for (GFacHandlerConfig handlerClassName : handlers) { Class<? extends GFacHandler> handlerClass; GFacHandler handler; http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java index c022bef..a01dcba 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java @@ -21,6 +21,7 @@ package org.apache.airavata.gfac.monitor; import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; import org.apache.airavata.model.workspace.experiment.JobState; @@ -62,6 +63,8 @@ public class MonitorID { private JobState state; + private JobExecutionContext jobExecutionContext; + public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) { this.host = host; this.jobStartedTime = new Timestamp((new Date()).getTime()); @@ -215,7 +218,15 @@ public class MonitorID { this.workflowNodeID = workflowNodeID; } -// public String getWorkflowNodeID() { + public JobExecutionContext getJobExecutionContext() { + return jobExecutionContext; + } + + public void setJobExecutionContext(JobExecutionContext jobExecutionContext) { + this.jobExecutionContext = jobExecutionContext; + } + + // public String getWorkflowNodeID() { // return workflowNodeID; // } // http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java index 0b6619e..789d188 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java @@ -33,7 +33,9 @@ import net.schmizz.sshj.transport.TransportException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.ApplicationDescription; import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.Scheduler; import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gfac.context.security.GSISecurityContext; import org.apache.airavata.gfac.context.security.SSHSecurityContext; @@ -42,10 +44,13 @@ import org.apache.airavata.gfac.util.GFACSSHUtils; import org.apache.airavata.gfac.utils.GFacUtils; import org.apache.airavata.gfac.utils.OutputUtils; import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.gsi.ssh.util.SSHUtils; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail; import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.DataType; +import org.apache.airavata.registry.cpi.RegistryException; import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.airavata.schemas.gfac.URIParameterType; import org.apache.xmlbeans.XmlException; @@ -56,7 +61,36 @@ public class SCPOutputHandler extends AbstractHandler{ private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class); public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException { - if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){ + if(jobExecutionContext.getTaskData().getJobDetailsListSize() == 0) { // this is because we don't have the right jobexecution context + // so attempting to get it from the registry + log.warn("During the out handler chain jobExecution context came null, so trying to handler"); + ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); + TaskDetails taskData = null; + try { + taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); + } catch (RegistryException e) { + log.error("Error retrieving job details from Registry"); + throw new GFacHandlerException("Error retrieving job details from Registry", e); + } + JobDetails jobDetails = taskData.getJobDetailsList().get(0); + String jobDescription = jobDetails.getJobDescription(); + if (jobDescription != null) { + JobDescriptor jobDescriptor = null; + try { + jobDescriptor = JobDescriptor.fromXML(jobDescription); + } catch (XmlException e1) { + e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + applicationDeploymentDescription.getType().setScratchWorkingDirectory( + jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); + applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); + applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); + applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); + applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); + } + } + + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { try { GFACSSHUtils.addSecurityContext(jobExecutionContext); } catch (ApplicationSettingsException e) { @@ -64,11 +98,11 @@ public class SCPOutputHandler extends AbstractHandler{ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } } - super.invoke(jobExecutionContext); + super.invoke(jobExecutionContext); DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() .getApplicationDeploymentDescription().getType(); try { Cluster cluster = null; @@ -94,10 +128,10 @@ public class SCPOutputHandler extends AbstractHandler{ if (taskData.getAdvancedOutputDataHandling() != null) { outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); } - if(outputDataDir == null) { + if (outputDataDir == null) { outputDataDir = File.separator + "tmp"; } - outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" +jobExecutionContext.getTaskData().getTaskID(); + outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID(); (new File(outputDataDir)).mkdirs(); @@ -114,35 +148,35 @@ public class SCPOutputHandler extends AbstractHandler{ status.setTransferState(TransferState.COMPLETE); detail.setTransferStatus(status); detail.setTransferDescription("STDOUT:" + stdOutStr); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); status.setTransferState(TransferState.COMPLETE); detail.setTransferStatus(status); detail.setTransferDescription("STDERR:" + stdErrStr); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>(); Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); Set<String> keys = output.keySet(); for (String paramName : keys) { - ActualParameter actualParameter = (ActualParameter) output.get(paramName); - if ("URI".equals(actualParameter.getType().getType().toString())) { - - List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory()); - if (outputList.size() == 0 || outputList.get(0).isEmpty()) { - stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); + ActualParameter actualParameter = (ActualParameter) output.get(paramName); + if ("URI".equals(actualParameter.getType().getType().toString())) { + + List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory()); + if (outputList.size() == 0 || outputList.get(0).isEmpty()) { + stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); + } else { + String valueList = outputList.get(0); + cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); + jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList); + ((URIParameterType) actualParameter.getType()).setValue(valueList); + stringMap = new HashMap<String, ActualParameter>(); + stringMap.put(paramName, actualParameter); + } } else { - String valueList = outputList.get(0); - cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); - jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList); - ((URIParameterType) actualParameter.getType()).setValue(valueList); - stringMap = new HashMap<String, ActualParameter>(); - stringMap.put(paramName, actualParameter); - } - }else{ - stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); - } + stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr); + } } if (stringMap == null || stringMap.isEmpty()) { throw new GFacHandlerException( @@ -151,7 +185,7 @@ public class SCPOutputHandler extends AbstractHandler{ } status.setTransferState(TransferState.DOWNLOAD); detail.setTransferStatus(status); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); app.setStandardError(localStdErrFile.getAbsolutePath()); app.setStandardOutput(localStdOutFile.getAbsolutePath()); @@ -165,14 +199,14 @@ public class SCPOutputHandler extends AbstractHandler{ } catch (IOException e) { throw new GFacHandlerException(e.getMessage(), e); } catch (Exception e) { - try { - status.setTransferState(TransferState.FAILED); - detail.setTransferStatus(status); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID()); - GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID()); - } catch (Exception e1) { - throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); - } + try { + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID()); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } throw new GFacHandlerException("Error in retrieving results", e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java index 3341d9b..53b9206 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java @@ -58,7 +58,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter { } - public String submit(String experimentID, String taskID) throws OrchestratorException { + public JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException { JobExecutionContext jobExecutionContext; try { jobExecutionContext = gfac.submitJob(experimentID, taskID); @@ -67,7 +67,7 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter { logger.error(error); throw new OrchestratorException(error); } - return jobExecutionContext.getJobDetails().getJobID(); + return jobExecutionContext; } public GFac getGfac() { @@ -86,9 +86,9 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter { this.orchestratorContext = orchestratorContext; } - public void runAfterJobTask(String experimentID, String taskID) throws OrchestratorException { + public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException { try { - gfac.invokeOutFlowHandlers(experimentID,taskID); + gfac.invokeOutFlowHandlers(jobExecutionContext); } catch (GFacException e) { throw new OrchestratorException(e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java index 1c3c934..cf8642f 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java @@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.job; import java.util.List; +import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.gfac.GFACInstance; @@ -50,15 +51,14 @@ public interface JobSubmitter { * just get the request data and do the submission * @param experimentID experimentID cannot be null * @param taskID taskID cannot be null - * @return jobID return the jobID from GFac + * @return JobExecutionContext return the jobExecutionContext from GFac */ - String submit(String experimentID, String taskID) throws OrchestratorException; + JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException; /** * This can be use to handle any after Jobsubmission task - * @param experimentID - * @param taskID + * @param jobExecutionContext * @throws OrchestratorException */ - void runAfterJobTask(String experimentID,String taskID) throws OrchestratorException; + void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index fa3b5f3..ca55169 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -29,6 +29,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; import org.apache.airavata.gfac.monitor.AbstractActivityListener; @@ -121,7 +122,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) { MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId, experimentId, userName); monitorManager.addAJobToMonitor(monitorID); - jobSubmitter.submit(experimentId, taskId); + JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId); if ("none".equals(jobID)) { logger.error("Job submission Failed, so we remove the job from monitoring"); @@ -132,9 +133,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst // Launching job for each task // if the monitoring is pull mode then we add the monitorID for each task after submitting // the job with the jobID, otherwise we don't need the jobID - jobSubmitter.submit(experimentId, taskId); + JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId); logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID); MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId, experimentId, userName, authenticationInfo); + monitorID.setJobExecutionContext(jobExecutionContext); if ("none".equals(jobID)) { logger.error("Job submission Failed, so we remove the job from monitoring"); @@ -187,7 +189,16 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException { if(status.getState() == JobState.COMPLETE){ MonitorID monitorID = status.getMonitorID(); - jobSubmitter.runAfterJobTask(monitorID.getExperimentID(), monitorID.getTaskID()); + if(monitorID.getJobExecutionContext() == null){ + // this code is to handle amqp scenario where monitorID doesn't have + // job execution context, in this case it will be created by the outputhandler + String experimentID = monitorID.getExperimentID(); + String taskID = monitorID.getTaskID(); + JobExecutionContext jobExecutionContext = new JobExecutionContext(null, null); + jobExecutionContext.setExperimentID(experimentID); + jobExecutionContext.setTaskData(new TaskDetails(taskID)); + } + jobSubmitter.runAfterJobTask(monitorID.getJobExecutionContext()); } } public ExecutorService getExecutor() { http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java deleted file mode 100644 index ef03fbc..0000000 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.job.monitor; - -import java.sql.Timestamp; -import java.util.Date; -import java.util.Map; - -import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* -This is the object which contains the data to identify a particular -Job to start the monitoring -*/ -public class MonitorID { - private final static Logger logger = LoggerFactory.getLogger(MonitorID.class); - - private String userName; - - private Timestamp jobStartedTime; - - private Timestamp lastMonitored; - - private HostDescription host; - - private AuthenticationInfo authenticationInfo = null; - - private Map<String, Object> parameters; - - private String experimentID; - - private String workflowNodeID; - - private String taskID; - - private String jobID; - - private int failedCount = 0; - - private JobState state; - - public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) { - this(host, jobID, taskID, workflowNodeID, experimentID, userName, null); - } - - public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) { - this.host = host; - this.jobStartedTime = new Timestamp((new Date()).getTime()); - this.authenticationInfo = authenticationInfo; - this.userName = userName; - // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user - if(this.authenticationInfo != null){ - if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){ - this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName(); - } - } - this.jobID = jobID; - this.taskID = taskID; - this.workflowNodeID=workflowNodeID; - this.experimentID = experimentID; - } - public HostDescription getHost() { - return host; - } - - public void setHost(HostDescription host) { - this.host = host; - } - - public Timestamp getLastMonitored() { - return lastMonitored; - } - - public void setLastMonitored(Timestamp lastMonitored) { - this.lastMonitored = lastMonitored; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getJobID() { - return jobID; - } - - public void setJobID(String jobID) { - this.jobID = jobID; - } - - public Timestamp getJobStartedTime() { - return jobStartedTime; - } - - public void setJobStartedTime(Timestamp jobStartedTime) { - this.jobStartedTime = jobStartedTime; - } - - public AuthenticationInfo getAuthenticationInfo() { - return authenticationInfo; - } - - public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { - this.authenticationInfo = authenticationInfo; - } - - public void addParameter(String key,Object value) { - this.parameters.put(key, value); - } - - public Object getParameter(String key) { - return this.parameters.get(key); - } - - public Map<String, Object> getParameters() { - return parameters; - } - - public void setParameters(Map<String, Object> parameters) { - this.parameters = parameters; - } - - public String getExperimentID() { - return experimentID; - } - - public void setExperimentID(String experimentID) { - this.experimentID = experimentID; - } - - public String getTaskID() { - return taskID; - } - - public void setTaskID(String taskID) { - this.taskID = taskID; - } - - public int getFailedCount() { - return failedCount; - } - - public void setFailedCount(int failedCount) { - this.failedCount = failedCount; - } - - public JobState getStatus() { - return state; - } - - public void setStatus(JobState status) { - // this logic is going to be useful for fast finishing jobs - // because in some machines job state vanishes quicckly when the job is done - // during that case job state comes as unknown.so we handle it here. - if (this.state != null && status.equals(JobState.UNKNOWN)) { - if (getFailedCount() > 2) { - switch (this.state) { - case ACTIVE: - this.state = JobState.COMPLETE; - break; - case QUEUED: - this.state = JobState.COMPLETE; - break; - } - } else { - try { - // when state becomes unknown we sleep for a while - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - setFailedCount(getFailedCount() + 1); - } - } else { - // normal scenario - this.state = status; - } - } - - public String getWorkflowNodeID() { - return workflowNodeID; - } - - public void setWorkflowNodeID(String workflowNodeID) { - this.workflowNodeID = workflowNodeID; - } - -// public String getWorkflowNodeID() { -// return workflowNodeID; -// } -// -// public void setWorkflowNodeID(String workflowNodeID) { -// this.workflowNodeID = workflowNodeID; -// } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/34a8147e/tools/pom.xml ---------------------------------------------------------------------- diff --git a/tools/pom.xml b/tools/pom.xml index 0bdcea2..8f188ad 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -30,7 +30,6 @@ <activeByDefault>true</activeByDefault> </activation> <modules> - <module>phoebus-integration</module> <module>registry-tool</module> <module>gsissh</module> </modules>
