Repository: airavata Updated Branches: refs/heads/master 624dd4149 -> ea93cc1ef
To fix AIRAVATA-1476 for unicore provider and added rabbitmq messaging. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d076fa8b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d076fa8b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d076fa8b Branch: refs/heads/master Commit: d076fa8bc928d8161a3483cc883108fd8e79f071 Parents: 1b84883 Author: raminder <[email protected]> Authored: Tue May 12 17:09:13 2015 -0400 Committer: raminder <[email protected]> Committed: Tue May 12 17:09:13 2015 -0400 ---------------------------------------------------------------------- .../main/resources/airavata-server.properties | 4 ++ .../gfac/bes/provider/impl/BESProvider.java | 50 +++++++++++--------- .../gfac/bes/utils/DataTransferrer.java | 14 ++++-- .../gfac/core/provider/AbstractProvider.java | 3 -- .../airavata/gfac/ssh/util/GFACSSHUtils.java | 44 ++++++++++------- 5 files changed, 68 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index c5d6a3f..27e962e 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -150,6 +150,10 @@ gfac.passive=true ### Incase of password authentication. #ssh.password=Password for ssh connection +################ ---------- BES Properties ------------------- ############### +#bes.ca.cert.path=<location>/certificates/cacert.pem +#bes.ca.key.path=<location>/certificates/cakey.pem +#bes.ca.key.pass=passphrase ########################################################################### http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index 2f6add6..73bf0fc 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -34,6 +34,7 @@ import org.apache.airavata.gfac.bes.utils.JSDLGenerator; import org.apache.airavata.gfac.bes.utils.SecurityUtils; import org.apache.airavata.gfac.bes.utils.StorageCreator; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; import org.apache.airavata.gfac.core.notification.events.StatusChangeEvent; import org.apache.airavata.gfac.core.notification.events.UnicoreJobIDEvent; @@ -44,6 +45,8 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.xmlbeans.XmlCursor; @@ -147,7 +150,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider, log.info(String.format("Activity Submitting to %s ... \n", factoryUrl)); - jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + monitorPublisher.publish(new StartExecutionEvent()); CreateActivityResponseDocument response = factory.createActivity(cad); log.info(String.format("Activity Submitted to %s \n", factoryUrl)); @@ -162,22 +165,15 @@ public class BESProvider extends AbstractProvider implements GFacProvider, .toString(); } log.info("JobID: " + jobId); - jobDetails.setJobID(activityEpr.toString()); + jobDetails.setJobID(jobId); jobDetails.setJobDescription(activityEpr.toString()); jobExecutionContext.setJobDetails(jobDetails); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); log.info(formatStatusMessage(activityEpr.getAddress() .getStringValue(), factory.getActivityStatus(activityEpr) .toString())); - - jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId)); -// GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SUBMITTED); - - log.info(formatStatusMessage(activityEpr.getAddress() - .getStringValue(), factory.getActivityStatus(activityEpr) - .toString())); - waitUntilDone(factory, activityEpr, jobDetails); ActivityStatusType activityStatus = null; @@ -196,10 +192,7 @@ public class BESProvider extends AbstractProvider implements GFacProvider, log.info(error); JobState applicationJobStatus = JobState.FAILED; - String jobStatusMessage = "Status of job " + jobId + "is " - + applicationJobStatus; - jobExecutionContext.getNotifier().publish( - new StatusChangeEvent(jobStatusMessage)); + sendNotification(jobExecutionContext,applicationJobStatus); GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus); try {Thread.sleep(5000);} catch (InterruptedException e) {} @@ -209,16 +202,16 @@ public class BESProvider extends AbstractProvider implements GFacProvider, } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { JobState applicationJobStatus = JobState.CANCELED; - String jobStatusMessage = "Status of job " + jobId + "is " - + applicationJobStatus; - jobExecutionContext.getNotifier().publish( - new StatusChangeEvent(jobStatusMessage)); + sendNotification(jobExecutionContext,applicationJobStatus); GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus); throw new GFacProviderException( jobExecutionContext.getExperimentID() + "Job Canceled"); } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { try { Thread.sleep(5000); + JobState applicationJobStatus = JobState.COMPLETE; + sendNotification(jobExecutionContext,applicationJobStatus); + } catch (InterruptedException e) { } if (activityStatus.getExitCode() == 0) { @@ -432,10 +425,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider, ActivityStatusType activityStatus = getStatus(factory, activityEpr); JobState applicationJobStatus = getApplicationJobStatus(activityStatus); - String jobStatusMessage = "Status of job " + jobId + "is " + applicationJobStatus; -// GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus); - - jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage)); + + sendNotification(jobExecutionContext,applicationJobStatus); // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, // applicationJobStatus); @@ -444,9 +435,24 @@ public class BESProvider extends AbstractProvider implements GFacProvider, } catch (InterruptedException e) {} continue; } + return; } catch(Exception e) { log.error("Error monitoring job status.."); throw e; } } + private void sendNotification(JobExecutionContext jobExecutionContext, JobState status) { + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); + JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID(), + jobExecutionContext.getGatewayID()); + jobStatus.setJobIdentity(jobIdentity); + jobStatus.setState(status); + log.debug(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + + "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), + jobStatus.getJobIdentity().getTaskId()); + monitorPublisher.publish(jobStatus); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java index d7f3244..453e45a 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java @@ -209,10 +209,14 @@ public class DataTransferrer { private String getDownloadLocation() { TaskDetails taskData = jobContext.getTaskData(); - if (taskData != null && taskData.getAdvancedOutputDataHandling() != null) { - String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); - return outputDataDirectory; - } - return null; + //In case of third party transfer this will not work. +// if (taskData != null && taskData.getAdvancedOutputDataHandling() != null) { +// String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); +// return outputDataDirectory; +// } + String outputDataDir = File.separator + "tmp"; + outputDataDir = outputDataDir + File.separator + jobContext.getExperimentID(); + (new File(outputDataDir)).mkdirs(); + return outputDataDir; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java index 3282b5a..b650482 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java @@ -45,9 +45,6 @@ public abstract class AbstractProvider implements GFacProvider{ protected AbstractProvider() { //todo this has to be fixed this.monitorPublisher = BetterGfacImpl.getMonitorPublisher(); - if(this.monitorPublisher == null){ - this.monitorPublisher = BetterGfacImpl.getMonitorPublisher(); - } } public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { http://git-wip-us.apache.org/repos/asf/airavata/blob/d076fa8b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java index bb580fb..eb0b811 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@ -207,7 +207,12 @@ public class GFACSSHUtils { SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); - SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + SSHJobSubmission sshJobSubmission = null; + try { + sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + } catch (Exception e1) { + logger.error("Not able to get SSHJobSubmission from registry"); + } Cluster pbsCluster = null; String key=sshAuth.getKey(); @@ -239,25 +244,30 @@ public class GFACSSHUtils { } if (recreate) { JobManagerConfiguration jConfig = null; - String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + String installedParentPath = null; + if(jobExecutionContext.getResourceJobManager()!= null){ + installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + } if (installedParentPath == null) { installedParentPath = "/"; } - String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); - if (jobManager == null) { - logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); - jConfig = CommonUtils.getPBSJobManager(installedParentPath); - } else { - if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { - jConfig = CommonUtils.getPBSJobManager(installedParentPath); - } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { - jConfig = CommonUtils.getSLURMJobManager(installedParentPath); - } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { - jConfig = CommonUtils.getSGEJobManager(installedParentPath); - } else if(LSF_JOB_MANAGER.equals(jobManager)) { - jConfig = CommonUtils.getLSFJobManager(installedParentPath); - } - } + if (sshJobSubmission != null) { + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSGEJobManager(installedParentPath); + } else if (LSF_JOB_MANAGER.equals(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + } pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig); key = sshAuth.getKey(); List<Cluster> pbsClusters = null;
