http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java new file mode 100644 index 0000000..5162e36 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.provider.impl; + +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; +import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; +import org.apache.airavata.gfac.core.provider.AbstractProvider; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.states.GfacExperimentState; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; +import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.CommandExecutor; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.job.JobDescriptor; +import org.apache.airavata.gfac.ssh.impl.JobStatus; +import org.apache.airavata.gfac.ssh.impl.RawCommandInfo; +import org.apache.airavata.gfac.ssh.impl.StandardOutReader; +import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.ExperimentState; +import org.apache.airavata.model.workspace.experiment.JobDetails; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.xmlbeans.XmlException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.*; +import java.util.*; + +/** + * Execute application using remote SSH + */ +public class SSHProvider extends AbstractProvider { + private static final Logger log = LoggerFactory.getLogger(SSHProvider.class); + private Cluster cluster; + private String jobID = null; + private String taskID = null; + // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh + private boolean hpcType = false; + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + try { + super.initialize(jobExecutionContext); + String hostAddress = jobExecutionContext.getHostName(); + ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); + ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } + taskID = jobExecutionContext.getTaskData().getTaskID(); + + JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH && resourceJobManagerType == ResourceJobManagerType.FORK) { + jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + + String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobID(taskID); + details.setJobDescription(remoteFile); + jobExecutionContext.setJobDetails(details); + // FIXME : Why cluster is passed as null + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster); + details.setJobDescription(jobDescriptor.toXML()); + + GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); + log.info(remoteFile); + File runscript = createShellScript(jobExecutionContext); + cluster.scpTo(remoteFile, runscript.getAbsolutePath()); + } else { + hpcType = true; + } + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } catch (Exception e) { + throw new GFacProviderException(e.getLocalizedMessage(), e); + } + } + + + public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { + if (!hpcType) { + try { + /* + * Execute + */ + String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobDescription(executable); + RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable); + StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); + log.info("Executing RawCommand : " + rawCommandInfo.getCommand()); + CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput); + String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource"); + log.info("stdout=" + stdOutputString); + } catch (Exception e) { + throw new GFacProviderException(e.getMessage(), e); + } + } else { + try { + StringBuffer data = new StringBuffer(); + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + JobDetails jobDetails = new JobDetails(); + String hostAddress = jobExecutionContext.getHostName(); + MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher(); + try { + Cluster cluster = null; + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster); + jobDetails.setJobName(jobDescriptor.getJobName()); + log.info(jobDescriptor.toXML()); + jobDetails.setJobDescription(jobDescriptor.toXML()); + String jobID = cluster.submitBatchJob(jobDescriptor); + if (jobID != null && !jobID.isEmpty()) { + jobDetails.setJobID(jobID); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); + jobExecutionContext.setJobDetails(jobDetails); + if (verifyJobSubmissionByJobId(cluster, jobID)) { + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); + } + } else { + jobExecutionContext.setJobDetails(jobDetails); + String verifyJobId = verifyJobSubmission(cluster, jobDetails); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobID = verifyJobId; + jobDetails.setJobID(jobID); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); + } + } + + if (jobID == null || jobID.isEmpty()) { + log.error("Couldn't find remote jobId for JobName:" + jobDetails.getJobName() + ", ExperimentId:" + jobExecutionContext.getExperimentID()); + GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.FAILED); + return; + } + data.append("jobDesc=").append(jobDescriptor.toXML()); + data.append(",jobId=").append(jobDetails.getJobID()); + monitor(jobExecutionContext); + } catch (SSHApiException e) { + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } catch (Exception e) { + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } finally { + log.info("Saving data for future recovery: "); + log.info(data.toString()); + GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName()); + } + } catch (GFacException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + + private boolean verifyJobSubmissionByJobId(Cluster cluster, String jobID) throws SSHApiException { + JobStatus status = cluster.getJobStatus(jobID); + return status != null && status != JobStatus.U; + } + + private String verifyJobSubmission(Cluster cluster, JobDetails jobDetails) { + String jobName = jobDetails.getJobName(); + String jobId = null; + try { + jobId = cluster.getJobIdByJobName(jobName, cluster.getServerInfo().getUserName()); + } catch (SSHApiException e) { + log.error("Error while verifying JobId from JobName"); + } + return jobId; + } + + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { + + } + + public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + JobDetails jobDetails = jobExecutionContext.getJobDetails(); + StringBuffer data = new StringBuffer(); + String hostAddress = jobExecutionContext.getHostName(); + if (!hpcType) { + throw new NotImplementedException(); + } else { + Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + if (jobDetails == null) { + log.error("There is not JobDetails, Cancel request can't be performed !!!"); + return false; + } + try { + if (jobDetails.getJobID() != null) { + if (cluster.cancelJob(jobDetails.getJobID()) != null) { + // if this operation success without any exceptions, we can assume cancel operation succeeded. + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); + return true; + } else { + log.info("Job Cancel operation failed"); + } + } else { + log.error("No Job Id is set, so cannot perform the cancel operation !!!"); + throw new GFacProviderException("Cancel request failed to cancel job as JobId is null in Job Execution Context"); + } + } catch (SSHApiException e) { + String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); +// throw new GFacProviderException(error, e); + } catch (Exception e) { + String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); + log.error(error); + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); +// throw new GFacProviderException(error, e); + } + return false; + } + } + + private File createShellScript(JobExecutionContext context) throws IOException { + String uniqueDir = jobExecutionContext.getApplicationName() + System.currentTimeMillis() + + new Random().nextLong(); + + File shellScript = File.createTempFile(uniqueDir, "sh"); + OutputStream out = new FileOutputStream(shellScript); + + out.write("#!/bin/bash\n".getBytes()); + out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes()); + out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes()); + out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n") + .getBytes()); + // get the env of the host and the application + List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment(); + for (SetEnvPaths setEnvPaths : envPathList) { + log.debug("Env[" + setEnvPaths.getName() + "] = " + setEnvPaths.getValue()); + out.write(("export " + setEnvPaths.getName() + "=" + setEnvPaths.getValue() + "\n").getBytes()); + } + + // prepare the command + final String SPACE = " "; + StringBuffer cmd = new StringBuffer(); + cmd.append(jobExecutionContext.getExecutablePath()); + cmd.append(SPACE); + + MessageContext input = context.getInMessageContext(); + Map<String, Object> inputs = input.getParameters(); + Set<String> keys = inputs.keySet(); + for (String paramName : keys) { + InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); + //if ("URIArray".equals(actualParameter.getType().getType().toString())) { + if (inputParamType.getType() == DataType.URI) { + String value = inputParamType.getValue(); + cmd.append(value); + cmd.append(SPACE); + } else { + String paramValue = inputParamType.getValue(); + cmd.append(paramValue); + cmd.append(SPACE); + } + } + // We redirect the error and stdout to remote files, they will be read + // in later + cmd.append(SPACE); + cmd.append("1>"); + cmd.append(SPACE); + cmd.append(jobExecutionContext.getStandardOutput()); + cmd.append(SPACE); + cmd.append("2>"); + cmd.append(SPACE); + cmd.append(jobExecutionContext.getStandardError()); + + String cmdStr = cmd.toString(); + log.info("Command = " + cmdStr); + out.write((cmdStr + "\n").getBytes()); + String message = "\"execuationSuceeded\""; + out.write(("echo " + message + "\n").getBytes()); + out.close(); + + return shellScript; + } + + public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { + + } + + /** + * This method will read standard output and if there's any it will be parsed + * + * @param jobIDReaderCommandOutput + * @param errorMsg + * @return + * @throws SSHApiException + */ + private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException { + String stdOutputString = jobIDReaderCommandOutput.getStdOutputString(); + String stdErrorString = jobIDReaderCommandOutput.getStdErrorString(); + + if (stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())) { + log.error("Standard Error output : " + stdErrorString); + throw new SSHApiException(errorMsg + stdErrorString); + } + return stdOutputString; + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + // have to implement the logic to recover a gfac failure + initialize(jobExecutionContext); + if(hpcType) { + log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID()); + String hostName = jobExecutionContext.getHostName(); + String jobId = ""; + String jobDesc = ""; + String jobName = ""; + try { + String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); + String[] split = pluginData.split(","); + if (split.length < 2) { + this.execute(jobExecutionContext); + return; + } + jobDesc = split[0].substring(8); + jobId = split[1].substring(6); + try { + JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDesc); + jobName = jobDescriptor.getJobName(); + } catch (XmlException e) { + log.error(e.getMessage(), e); + log.error("Cannot parse plugin data stored, but trying to recover"); + + } + log.info("Following data have recovered: "); + log.info("Job Description: " + jobDesc); + log.info("Job Id: " + jobId); + if (jobName.isEmpty() || jobId.isEmpty() || "none".equals(jobId) || + "".equals(jobId)) { + log.info("Cannot recover data so submitting the job again !!!"); + this.execute(jobExecutionContext); + return; + } + } catch (Exception e) { + log.error("Error while recovering provider", e); + } + try { + // Now we are we have enough data to recover + JobDetails jobDetails = new JobDetails(); + jobDetails.setJobDescription(jobDesc); + jobDetails.setJobID(jobId); + jobDetails.setJobName(jobName); + jobExecutionContext.setJobDetails(jobDetails); + if (jobExecutionContext.getSecurityContext(hostName) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + monitor(jobExecutionContext); + } catch (Exception e) { + log.error("Error while recover the job", e); + throw new GFacProviderException("Error delegating already ran job to Monitoring", e); + } + }else{ + log.info("We do not handle non hpc recovery so we simply run the Job directly"); + this.execute(jobExecutionContext); + } + } + + @Override + public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = null; + try { + sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + } catch (AppCatalogException e) { + throw new GFacException("Error while reading compute resource", e); + } + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + try { + EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor( + sshJobSubmission.getResourceJobManager().getResourceJobManagerType()); + emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); + } catch (AiravataException e) { + throw new GFacHandlerException("Error while activating email job monitoring ", e); + } + return; + } + } else { + throw new IllegalArgumentException("Monitoring is implemented only for SSH, " + + jobExecutionContext.getPreferredJobSubmissionProtocol().name() + " is not yet implemented"); + } + + } +}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java new file mode 100644 index 0000000..c6cac79 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/SSHSecurityContext.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.ssh.security; + +import java.io.IOException; + +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.connection.channel.direct.Session; +import net.schmizz.sshj.userauth.keyprovider.KeyProvider; + +import org.apache.airavata.gfac.SecurityContext; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handle SSH security + */ +public class SSHSecurityContext implements SecurityContext { + private static final Logger log = LoggerFactory.getLogger(SSHSecurityContext.class); + + private String username; + private String privateKeyLoc; + private String keyPass; + private SSHClient sshClient; + private Session session; + + private Cluster pbsCluster; + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPrivateKeyLoc() { + return privateKeyLoc; + } + + public void setPrivateKeyLoc(String privateKeyLoc) { + this.privateKeyLoc = privateKeyLoc; + } + + public String getKeyPass() { + return keyPass; + } + + public void setKeyPass(String keyPass) { + this.keyPass = keyPass; + } + + public void closeSession(Session session) { + if (session != null) { + try { + session.close(); + } catch (Exception e) { + log.warn("Cannot Close SSH Session"); + } + } + } + + public Session getSession(String hostAddress) throws IOException { + try { + if (sshClient == null) { + sshClient = new SSHClient(); + } + if (getSSHClient().isConnected()) + return getSSHClient().startSession(); + + KeyProvider pkey = getSSHClient().loadKeys(getPrivateKeyLoc(), getKeyPass()); + + getSSHClient().loadKnownHosts(); + + getSSHClient().connect(hostAddress); + getSSHClient().authPublickey(getUsername(), pkey); + session = getSSHClient().startSession(); + return session; + + } catch (NullPointerException ne) { + throw new SecurityException("Cannot load security context for SSH", ne); + } + } + + public SSHClient getSSHClient() { + if (sshClient == null) { + sshClient = new SSHClient(); + } + return sshClient; + } + + public void setPbsCluster(Cluster pbsCluster) { + this.pbsCluster = pbsCluster; + } + + public Cluster getPbsCluster() { + return this.pbsCluster; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java new file mode 100644 index 0000000..3b90b40 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.security; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.IOUtil; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.authentication.SSHPublicKeyFileAuthentication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Properties; + +public class TokenizedSSHAuthInfo implements SSHPublicKeyFileAuthentication { + protected static final Logger log = LoggerFactory.getLogger(TokenizedSSHAuthInfo.class); + + private String publicKeyFile; + + private String privateKeyFile; + + private String passPhrase = null; + + private SSHCredential gssCredentials = null; + + private CredentialReader credentialReader; + + private RequestData requestData; + + public TokenizedSSHAuthInfo(CredentialReader credentialReader, RequestData requestData) { + this.credentialReader = credentialReader; + this.requestData = requestData; + } + + public TokenizedSSHAuthInfo(RequestData requestData) { + this.requestData = requestData; + } + + public String getPublicKeyFile(String userName, String hostName) { + return publicKeyFile; + } + + public String getPrivateKeyFile(String userName, String hostName) { + return privateKeyFile; + } + + public String getPassPhrase() { + return passPhrase; + } + + public void bannerMessage(String message) { + + } + + public SSHCredential getCredentials() throws SecurityException { + + if (gssCredentials == null) { + + try { + gssCredentials = getCredentialsFromStore(); + } catch (Exception e) { + log.error("An exception occurred while retrieving credentials from the credential store. " + + "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId() + e.getMessage(), e); + } + + if (gssCredentials == null) { + System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials"); + try { + gssCredentials = getDefaultCredentials(); + } catch (Exception e) { + throw new SecurityException("Error retrieving my proxy using username password",e.getCause()); + } + } + // if still null, throw an exception + if (gssCredentials == null) { + throw new SecurityException("Unable to retrieve my proxy credentials to continue operation."); + } + } + + return gssCredentials; + } + + + /** + * Reads the credentials from credential store. + * + * @return If token is found in the credential store, will return a valid credential. Else returns null. + * @throws Exception If an error occurred while retrieving credentials. + */ + public SSHCredential getCredentialsFromStore() throws Exception { + + if (getCredentialReader() == null) { + credentialReader = GFacUtils.getCredentialReader(); + if(credentialReader == null){ + return null; + } + } + + Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(), + getRequestData().getTokenId()); + + if (credential instanceof SSHCredential) { + SSHCredential credential1 = (SSHCredential) credential; + this.publicKeyFile = writeFileToDisk(credential1.getPublicKey()); + this.privateKeyFile = writeFileToDisk(credential1.getPrivateKey()); + this.passPhrase = credential1.getPassphrase(); + System.out.println(this.publicKeyFile); + System.out.println(this.privateKeyFile); + System.out.println(this.passPhrase); + this.getRequestData().setRequestUser(credential1.getPortalUserName()); + return credential1; + } else { + log.info("Could not find SSH credentials for token - " + getRequestData().getTokenId() + " and " + + "gateway id - " + getRequestData().getGatewayId()); + } + + return null; + } + + /** + * Gets the default proxy certificate. + * + * @return Default my proxy credentials. + * @throws org.apache.airavata.gfac.GFacException If an error occurred while retrieving credentials. + * @throws org.apache.airavata.common.exception.ApplicationSettingsException + */ + public SSHCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException, IOException { + Properties configurationProperties = ServerSettings.getProperties(); + String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME); + this.getRequestData().setRequestUser(sshUserName); + this.privateKeyFile = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY); + this.publicKeyFile = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY); + this.passPhrase = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS); + this.getRequestData().setRequestUser(sshUserName); + return new SSHCredential(IOUtil.readToByteArray(new File(this.privateKeyFile)), IOUtil.readToByteArray(new File(this.publicKeyFile)), this.passPhrase, requestData.getGatewayId(), sshUserName); + } + + public CredentialReader getCredentialReader() { + return credentialReader; + } + + public RequestData getRequestData() { + return requestData; + } + + private String writeFileToDisk(byte[] data) { + File temp = null; + try { + temp = File.createTempFile("id_rsa", ""); + //write it + FileOutputStream bw = new FileOutputStream(temp); + bw.write(data); + bw.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + return temp.getAbsolutePath(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java new file mode 100644 index 0000000..f2afedc --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@ -0,0 +1,561 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.util; + +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.context.SSHAuthWrapper; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.gfac.ssh.api.ServerInfo; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.ssh.api.job.JobDescriptor; +import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration; +import org.apache.airavata.gfac.ssh.impl.GSISSHAbstractCluster; +import org.apache.airavata.gfac.ssh.impl.PBSCluster; +import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.*; + +public class GFACSSHUtils { + private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class); + + public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>(); + + public static final String PBS_JOB_MANAGER = "pbs"; + public static final String SLURM_JOB_MANAGER = "slurm"; + public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; + public static final String LSF_JOB_MANAGER = "LSF"; + + public static int maxClusterCount = 5; + + /** + * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext + * @param jobExecutionContext + * @throws GFacException + * @throws ApplicationSettingsException + */ + public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { + JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + if (preferredJobSubmissionProtocol == JobSubmissionProtocol.GLOBUS || preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) { + logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); + } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { + try { + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol(); + if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) { + SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); + String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework + RequestData requestData = new RequestData(jobExecutionContext.getGatewayID()); + requestData.setTokenId(credentialStoreToken); + + ServerInfo serverInfo = new ServerInfo(null, jobExecutionContext.getHostName()); + + Cluster pbsCluster = null; + try { + AuthenticationInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData); + String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + if (installedParentPath == null) { + installedParentPath = "/"; + } + + SSHCredential credentials =((TokenizedSSHAuthInfo)tokenizedSSHAuthInfo).getCredentials();// this is just a call to get and set credentials in to this object,data will be used + if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){ + // now we fall back to username password authentication + Properties configurationProperties = ServerSettings.getProperties(); + tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD)); + } + // This should be the login user name from compute resource preference + String loginUser = jobExecutionContext.getLoginUserName(); + if (loginUser == null) { + loginUser = credentials.getPortalUserName(); + } + serverInfo.setUserName(loginUser); + jobExecutionContext.getExperiment().setUserName(loginUser); + + + // inside the pbsCluser object + + String key = loginUser + jobExecutionContext.getHostName() + serverInfo.getPort(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } + logger.info("Re-using the same connection used with the connection string:" + key); + } else { + recreate = true; + } + if (recreate) { + JobManagerConfiguration jConfig = null; + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getUGEJobManager(installedParentPath); + } else if (LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + + pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,jConfig); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + } catch (Exception e) { + throw new GFacException("Error occurred...", e); + } + sshSecurityContext.setPbsCluster(pbsCluster); + jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), sshSecurityContext); + } + } catch (AppCatalogException e) { + throw new GFacException("Error while getting SSH Submission object from app catalog", e); + } + } + } + + /** + * This method can be used to add third party resource security contexts + * @param jobExecutionContext + * @param sshAuth + * @throws GFacException + * @throws ApplicationSettingsException + */ + public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException { + try { + if(sshAuth== null) { + throw new GFacException("Error adding security Context, because sshAuthWrapper is null"); + } + SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + SSHJobSubmission sshJobSubmission = null; + try { + sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); + } catch (Exception e1) { + logger.error("Not able to get SSHJobSubmission from registry"); + } + + Cluster pbsCluster = null; + String key=sshAuth.getKey(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } + logger.info("Re-using the same connection used with the connection string:" + key); + } else { + recreate = true; + } + if (recreate) { + JobManagerConfiguration jConfig = null; + String installedParentPath = null; + if(jobExecutionContext.getResourceJobManager()!= null){ + installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); + } + if (installedParentPath == null) { + installedParentPath = "/"; + } + if (sshJobSubmission != null) { + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); + if (jobManager == null) { + logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else { + if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getPBSJobManager(installedParentPath); + } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getSLURMJobManager(installedParentPath); + } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { + jConfig = CommonUtils.getUGEJobManager(installedParentPath); + } else if (LSF_JOB_MANAGER.equals(jobManager)) { + jConfig = CommonUtils.getLSFJobManager(installedParentPath); + } + } + } + pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig); + key = sshAuth.getKey(); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + sshSecurityContext.setPbsCluster(pbsCluster); + jobExecutionContext.addSecurityContext(key, sshSecurityContext); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new GFacException("Error adding security Context", e); + } + } + + + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) throws AppCatalogException, ApplicationSettingsException { + JobDescriptor jobDescriptor = new JobDescriptor(); + TaskDetails taskData = jobExecutionContext.getTaskData(); + + + // set email based job monitoring email address if monitor mode is JOB_EMAIL_NOTIFICATION_MONITOR + boolean addJobNotifMail = isEmailBasedJobMonitor(jobExecutionContext); + String emailIds = null; + if (addJobNotifMail) { + emailIds = ServerSettings.getEmailBasedMonitorAddress(); + } + // add all configured job notification email addresses. + if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) { + String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS); + if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) { + flags = "ALL"; + } + jobDescriptor.setMailOptions(flags); + + String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS); + if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds += ("," + userJobNotifEmailIds); + } else { + emailIds = userJobNotifEmailIds; + } + } + + if (taskData.isEnableEmailNotification()) { + List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses(); + String elist = GFacUtils.listToCsv(emailList, ','); + if (elist != null && !elist.isEmpty()) { + if (emailIds != null && !emailIds.isEmpty()) { + emailIds = emailIds + "," + elist; + } else { + emailIds = elist; + } + } + } + } + if (emailIds != null && !emailIds.isEmpty()) { + logger.info("Email list: " + emailIds); + jobDescriptor.setMailAddress(emailIds); + } + // this is common for any application descriptor + + jobDescriptor.setCallBackIp(ServerSettings.getIp()); + jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); + jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); + jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); + jobDescriptor.setExecutablePath(jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getExecutablePath()); + jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); + jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); + String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount(); + if (computationalProjectAccount == null){ + ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference(); + if (computeResourcePreference != null) { + computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber(); + } + } + if (computationalProjectAccount != null) { + jobDescriptor.setAcountString(computationalProjectAccount); + } + // To make job name alpha numeric + jobDescriptor.setJobName("A" + String.valueOf(generateJobName())); + jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir()); + + List<String> inputValues = new ArrayList<String>(); + MessageContext input = jobExecutionContext.getInMessageContext(); + + // sort the inputs first and then build the command ListR + Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { + @Override + public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { + return inputDataObjectType.getInputOrder() - t1.getInputOrder(); + } + }; + Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); + for (Object object : input.getParameters().values()) { + if (object instanceof InputDataObjectType) { + InputDataObjectType inputDOT = (InputDataObjectType) object; + sortedInputSet.add(inputDOT); + } + } + for (InputDataObjectType inputDataObjectType : sortedInputSet) { + if (!inputDataObjectType.isRequiredToAddedToCommandLine()) { + continue; + } + if (inputDataObjectType.getApplicationArgument() != null + && !inputDataObjectType.getApplicationArgument().equals("")) { + inputValues.add(inputDataObjectType.getApplicationArgument()); + } + + if (inputDataObjectType.getValue() != null + && !inputDataObjectType.getValue().equals("")) { + if (inputDataObjectType.getType() == DataType.URI) { + // set only the relative path + String filePath = inputDataObjectType.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + }else { + inputValues.add(inputDataObjectType.getValue()); + } + + } + } + Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters(); + for (Object outputParam : outputParams.values()) { + if (outputParam instanceof OutputDataObjectType) { + OutputDataObjectType output = (OutputDataObjectType) outputParam; + if (output.getApplicationArgument() != null + && !output.getApplicationArgument().equals("")) { + inputValues.add(output.getApplicationArgument()); + } + if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) { + if (output.getType() == DataType.URI){ + String filePath = output.getValue(); + filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); + inputValues.add(filePath); + } + } + } + } + + jobDescriptor.setInputValues(inputValues); + jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName()); + jobDescriptor.setShellName("/bin/bash"); + jobDescriptor.setAllEnvExport(true); + jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); + + ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); + + + ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling(); + if (taskScheduling != null) { + int totalNodeCount = taskScheduling.getNodeCount(); + int totalCPUCount = taskScheduling.getTotalCPUCount(); + + + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + + if (totalNodeCount > 0) { + jobDescriptor.setNodes(totalNodeCount); + } + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + jobDescriptor.setProcessesPerNode(ppn); + jobDescriptor.setCPUCount(totalCPUCount); + } + if (taskScheduling.getWallTimeLimit() > 0) { + jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit())); + if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){ + jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit())); + } + } + if (taskScheduling.getTotalPhysicalMemory() > 0) { + jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + ""); + } + } else { + logger.error("Task scheduling cannot be null at this point.."); + } + ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); + List<String> moduleCmds = appDepDescription.getModuleLoadCmds(); + if (moduleCmds != null) { + for (String moduleCmd : moduleCmds) { + jobDescriptor.addModuleLoadCommands(moduleCmd); + } + } + List<String> preJobCommands = appDepDescription.getPreJobCommands(); + if (preJobCommands != null) { + for (String preJobCommand : preJobCommands) { + jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext)); + } + } + + List<String> postJobCommands = appDepDescription.getPostJobCommands(); + if (postJobCommands != null) { + for (String postJobCommand : postJobCommands) { + jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext)); + } + } + + ApplicationParallelismType parallelism = appDepDescription.getParallelism(); + if (parallelism != null){ + if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){ + Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands(); + if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) { + for (JobManagerCommand command : jobManagerCommands.keySet()) { + if (command == JobManagerCommand.SUBMISSION) { + String commandVal = jobManagerCommands.get(command); + jobDescriptor.setJobSubmitter(commandVal); + } + } + } + } + } + return jobDescriptor; + } + + public static boolean isEmailBasedJobMonitor(JobExecutionContext jobExecutionContext) throws AppCatalogException { + if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); + SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); + return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR; + } else { + return false; + } + } + + private static int generateJobName() { + Random random = new Random(); + int i = random.nextInt(Integer.MAX_VALUE); + i = i + 99999999; + if(i<0) { + i = i * (-1); + } + return i; + } + + private static String parseCommand(String value, JobExecutionContext jobExecutionContext) { + String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir()); + parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir()); + parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir()); + return parsedValue; + } + /** + * This method can be used to set the Security Context if its not set and later use it in other places + * @param jobExecutionContext + * @param authenticationInfo + * @param userName + * @param hostName + * @param port + * @return + * @throws GFacException + */ + public static String prepareSecurityContext(JobExecutionContext jobExecutionContext, AuthenticationInfo authenticationInfo + , String userName, String hostName, int port) throws GFacException { + ServerInfo serverInfo = new ServerInfo(userName, hostName); + String key = userName+hostName+port; + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key); + if (jobExecutionContext.getSecurityContext(key) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext, sshAuthWrapper); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage()); + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + logger.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + return key; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java new file mode 100644 index 0000000..72c032b --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java @@ -0,0 +1,96 @@ +package org.apache.airavata.gfac.ssh.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * To handle outputs of different data types + * + */ +public class HandleOutputs { + private static final Logger log = LoggerFactory.getLogger(HandleOutputs.class); + + public static List<OutputDataObjectType> handleOutputs(JobExecutionContext jobExecutionContext, Cluster cluster) throws GFacHandlerException { + List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); + try { + String outputDataDir = File.separator + "tmp" + File.separator + jobExecutionContext.getExperimentID(); + (new File(outputDataDir)).mkdirs(); + + List<OutputDataObjectType> outputs = jobExecutionContext.getTaskData().getApplicationOutputs(); + List<String> outputList = cluster.listDirectory(jobExecutionContext.getWorkingDir()); + boolean missingOutput = false; + + for (OutputDataObjectType output : outputs) { + // FIXME: Validation of outputs based on required and optional and search based on REGEX provided in search. + + if (DataType.URI == output.getType()) { + // for failed jobs outputs are not generated. So we should not download outputs + if (GFacUtils.isFailedJob(jobExecutionContext)){ + continue; + } + String outputFile = output.getValue(); + String fileName = outputFile.substring(outputFile.lastIndexOf(File.separatorChar) + 1, outputFile.length()); + + if (output.getLocation() == null && !outputList.contains(fileName) && output.isIsRequired()) { + missingOutput = true; + } else { + cluster.scpFrom(outputFile, outputDataDir); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + output.setValue(localFile); + outputArray.add(output); + } + + } else if (DataType.STDOUT == output.getType()) { + String downloadFile = jobExecutionContext.getStandardOutput(); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + cluster.scpFrom(downloadFile, outputDataDir); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + jobExecutionContext.setStandardOutput(localFile); + output.setValue(localFile); + outputArray.add(output); + + } else if (DataType.STDERR == output.getType()) { + String downloadFile = jobExecutionContext.getStandardError(); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + cluster.scpFrom(downloadFile, outputDataDir); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + jobExecutionContext.setStandardError(localFile); + output.setValue(localFile); + outputArray.add(output); + + } + } + if (outputArray == null || outputArray.isEmpty()) { + log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names"); + if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) { + throw new GFacHandlerException("Empty Output returned from the Application, Double check the application" + + "and ApplicationDescriptor output Parameter Names"); + } + } + + if (missingOutput) { + String arrayString = Arrays.deepToString(outputArray.toArray()); + log.error(arrayString); + throw new GFacHandlerException("Required output is missing"); + } + } catch (Exception e) { + throw new GFacHandlerException(e); + } + jobExecutionContext.getTaskData().setApplicationOutputs(outputArray); + return outputArray; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/resources/LSFTemplate.xslt ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/resources/LSFTemplate.xslt b/modules/gfac/gfac-impl/src/main/resources/LSFTemplate.xslt new file mode 100644 index 0000000..c548d8e --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/resources/LSFTemplate.xslt @@ -0,0 +1,93 @@ +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file + distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under + the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to + in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF + ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under + the License. --> +<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns="http://airavata.apache.org/gsi/ssh/2012/12"> + <xsl:output method="text" /> + <xsl:template match="/ns:JobDescriptor"> + <xsl:param name="quote">"</xsl:param> +#! /bin/bash +# LSF batch job submission script generated by Apache Airavata +# + <xsl:choose> + <xsl:when test="ns:shellName"> +#BSUB -L <xsl:value-of select="ns:shellName"/> + </xsl:when></xsl:choose> + <xsl:choose> + <xsl:when test="ns:queueName"> +#BSUB -q <xsl:value-of select="ns:queueName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:nodes"> +#BSUB -n <xsl:value-of select="ns:nodes"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:mailAddress"> +#BSUB -u <xsl:value-of select="ns:mailAddress"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:jobName"> +#BSUB -J <xsl:value-of select="ns:jobName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:acountString"> +#BSUB -P <xsl:value-of select="ns:acountString"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:maxWallTime"> +#BSUB -W <xsl:value-of select="ns:maxWallTime"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#BSUB -o "<xsl:value-of select="ns:standardOutFile"/>" + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#BSUB -e "<xsl:value-of select="ns:standardErrorFile"/>" + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:chassisName"> +#BSUB -m c<xsl:value-of select="ns:chassisName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:usedMem"> +#BSUB -R rusage[mem=<xsl:value-of select="ns:usedMem"/>] + </xsl:when> + </xsl:choose> + + <xsl:text>
</xsl:text> + + <xsl:text>
</xsl:text> + <xsl:for-each select="ns:moduleLoadCommands/ns:command"> + <xsl:text>
</xsl:text> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> + <xsl:text>
</xsl:text> +cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>
</xsl:text> + <xsl:for-each select="ns:preJobCommands/ns:command"> + <xsl:value-of select="."/><xsl:text> </xsl:text> + <xsl:text>
</xsl:text> + </xsl:for-each> + <xsl:text>
</xsl:text> + <xsl:choose><xsl:when test="ns:jobSubmitterCommand != ''"> + <xsl:value-of select="ns:jobSubmitterCommand"/><xsl:text> </xsl:text> + </xsl:when></xsl:choose><xsl:value-of select="ns:executablePath"/><xsl:text> </xsl:text> + <xsl:for-each select="ns:inputs/ns:input"> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> + <xsl:text>
</xsl:text> + </xsl:template> + +</xsl:stylesheet> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/resources/PBSTemplate.xslt ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/resources/PBSTemplate.xslt b/modules/gfac/gfac-impl/src/main/resources/PBSTemplate.xslt new file mode 100644 index 0000000..73c5eb6 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/resources/PBSTemplate.xslt @@ -0,0 +1,82 @@ +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file + distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under + the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to + in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF + ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under + the License. --> +<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns="http://airavata.apache.org/gsi/ssh/2012/12"> +<xsl:output method="text" /> +<xsl:template match="/ns:JobDescriptor"> +#! /bin/sh +# <xsl:choose> + <xsl:when test="ns:shellName"> +##PBS -S <xsl:value-of select="ns:shellName"/> + </xsl:when></xsl:choose> + <xsl:choose> + <xsl:when test="ns:queueName"> +#PBS -q <xsl:value-of select="ns:queueName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:mailOptions"> +#PBS -m <xsl:value-of select="ns:mailOptions"/> + </xsl:when> + </xsl:choose> + <xsl:choose> +<xsl:when test="ns:acountString"> +#PBS -A <xsl:value-of select="ns:acountString"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:maxWallTime"> +#PBS -l walltime=<xsl:value-of select="ns:maxWallTime"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:jobName"> +#PBS -N <xsl:value-of select="ns:jobName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#PBS -o <xsl:value-of select="ns:standardOutFile"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#PBS -e <xsl:value-of select="ns:standardErrorFile"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:usedMem"> +#PBS -l mem=<xsl:value-of select="ns:usedMem"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="(ns:nodes) and (ns:processesPerNode)"> +#PBS -l nodes=<xsl:value-of select="ns:nodes"/>:ppn=<xsl:value-of select="ns:processesPerNode"/> +<xsl:text>
</xsl:text> + </xsl:when> + </xsl:choose> +<xsl:for-each select="ns:exports/ns:name"> +<xsl:value-of select="."/>=<xsl:value-of select="./@value"/><xsl:text>
</xsl:text> +export<xsl:text> </xsl:text><xsl:value-of select="."/> +<xsl:text>
</xsl:text> +</xsl:for-each> +<xsl:for-each select="ns:preJobCommands/ns:command"> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> +cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>
</xsl:text> + <xsl:choose><xsl:when test="ns:jobSubmitterCommand"> +<xsl:value-of select="ns:jobSubmitterCommand"/><xsl:text> </xsl:text></xsl:when></xsl:choose><xsl:value-of select="ns:executablePath"/><xsl:text> </xsl:text> +<xsl:for-each select="ns:inputs/ns:input"> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> +<xsl:for-each select="ns:postJobCommands/ns:command"> + <xsl:value-of select="."/><xsl:text> </xsl:text> +</xsl:for-each> + +</xsl:template> + +</xsl:stylesheet> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/resources/SLURMTemplate.xslt ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/resources/SLURMTemplate.xslt b/modules/gfac/gfac-impl/src/main/resources/SLURMTemplate.xslt new file mode 100644 index 0000000..4a62722 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/resources/SLURMTemplate.xslt @@ -0,0 +1,78 @@ +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file + distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under + the Apache License, Version 2.0 (theà "License"); you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to + in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF + ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under + the License. --> +<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns="http://airavata.apache.org/gsi/ssh/2012/12"> +<xsl:output method="text" /> +<xsl:template match="/ns:JobDescriptor"> +<xsl:choose> +<xsl:when test="ns:shellName"> +#!<xsl:value-of select="ns:shellName"/> + </xsl:when> + </xsl:choose> +<xsl:choose> + <xsl:when test="ns:queueName"> +#SBATCH -p <xsl:value-of select="ns:queueName"/> + </xsl:when> + </xsl:choose> +<xsl:choose> + <xsl:when test="ns:nodes"> +#SBATCH -N <xsl:value-of select="ns:nodes"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:cpuCount"> +#SBATCH -n <xsl:value-of select="ns:cpuCount"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:mailAddress"> +#SBATCH -mail-user=<xsl:value-of select="ns:mailAddress"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:mailType"> +#SBATCH -mail-type=<xsl:value-of select="ns:mailType"/> + </xsl:when> + </xsl:choose> + <xsl:choose> +<xsl:when test="ns:acountString"> +#SBATCH -A <xsl:value-of select="ns:acountString"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:maxWallTime"> +#SBATCH -t <xsl:value-of select="ns:maxWallTime"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:jobName"> +#SBATCH -J <xsl:value-of select="ns:jobName"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#SBATCH -o <xsl:value-of select="ns:standardOutFile"/> + </xsl:when> + </xsl:choose> + <xsl:choose> + <xsl:when test="ns:standardOutFile"> +#SBATCH -e <xsl:value-of select="ns:standardErrorFile"/> + </xsl:when> + </xsl:choose> + <xsl:for-each select="ns:preJobCommands/ns:command"> + <xsl:text>
</xsl:text> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> +cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>
</xsl:text> + <xsl:choose><xsl:when test="ns:jobSubmitterCommand"> +<xsl:value-of select="ns:jobSubmitterCommand"/><xsl:text> </xsl:text></xsl:when></xsl:choose><xsl:value-of select="ns:executablePath"/><xsl:text> </xsl:text> +<xsl:for-each select="ns:inputs/ns:input"> + <xsl:value-of select="."/><xsl:text> </xsl:text> + </xsl:for-each> +</xsl:template> + +</xsl:stylesheet> \ No newline at end of file
