Repository: airavata Updated Branches: refs/heads/master 140d9bd50 -> 08cdad264
http://git-wip-us.apache.org/repos/asf/airavata/blob/08cdad26/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java deleted file mode 100644 index bd8a0bc..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ /dev/null @@ -1,473 +0,0 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.gfac.ssh.provider.impl; -// -//import org.apache.airavata.gfac.core.cluster.RemoteCluster; -//import org.apache.airavata.model.experiment.TaskState; -//import org.apache.airavata.registry.cpi.AppCatalogException; -//import org.apache.airavata.common.exception.AiravataException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.LocalEventPublisher; -//import org.apache.airavata.gfac.core.GFacConstants; -//import org.apache.airavata.gfac.core.GFacException; -//import org.apache.airavata.gfac.core.JobDescriptor; -//import org.apache.airavata.gfac.core.SSHApiException; -//import org.apache.airavata.gfac.core.cluster.JobStatus; -//import org.apache.airavata.gfac.core.context.JobExecutionContext; -//import org.apache.airavata.gfac.core.context.MessageContext; -//import org.apache.airavata.gfac.core.handler.GFacHandlerException; -//import org.apache.airavata.gfac.core.monitor.MonitorID; -//import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; -//import org.apache.airavata.gfac.core.provider.AbstractProvider; -//import org.apache.airavata.gfac.core.provider.GFacProviderException; -//import org.apache.airavata.gfac.core.states.GfacExperimentState; -//import org.apache.airavata.gfac.core.GFacUtils; -//import org.apache.airavata.gfac.gsi.ssh.api.CommandExecutor; -//import org.apache.airavata.gfac.impl.StandardOutReader; -//import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; -//import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; -//import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; -//import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; -//import org.apache.airavata.gfac.core.cluster.RawCommandInfo; -//import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; -//import org.apache.airavata.model.appcatalog.appinterface.DataType; -//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -//import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; -//import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; -//import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; -//import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; -//import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; -//import org.apache.airavata.model.experiment.CorrectiveAction; -//import org.apache.airavata.model.experiment.ErrorCategory; -//import org.apache.airavata.model.experiment.JobDetails; -//import org.apache.airavata.model.experiment.JobState; -//import org.apache.xmlbeans.XmlException; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import sun.reflect.generics.reflectiveObjects.NotImplementedException; -// -//import java.io.*; -//import java.util.*; -// -///** -// * Execute application using remote SSH -// */ -//public class SSHProvider extends AbstractProvider { -// private static final Logger log = LoggerFactory.getLogger(SSHProvider.class); -// private RemoteCluster remoteCluster; -// private String jobID = null; -// private String taskID = null; -// // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh -// private boolean hpcType = false; -// -// public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { -// try { -// super.initialize(jobExecutionContext); -// String hostAddress = jobExecutionContext.getHostName(); -// ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); -// ResourceJobManagerType resourceJobManagerType = resourceJobManager.getResourceJobManagerType(); -// if (jobExecutionContext.getSecurityContext(hostAddress) == null) { -// GFACSSHUtils.addSecurityContext(jobExecutionContext); -// } -// taskID = jobExecutionContext.getTaskData().getTaskID(); -// -// JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); -// if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH && resourceJobManagerType == ResourceJobManagerType.FORK) { -// jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); -// remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster(); -// -// String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME; -// details.setJobID(taskID); -// details.setJobDescription(remoteFile); -// jobExecutionContext.setJobDetails(details); -// // FIXME : Why remoteCluster is passed as null -// JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, remoteCluster); -// details.setJobDescription(jobDescriptor.toXML()); -// -// GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); -// log.info(remoteFile); -// File runscript = createShellScript(jobExecutionContext); -// remoteCluster.scpTo(remoteFile, runscript.getAbsolutePath()); -// } else { -// hpcType = true; -// } -// } catch (ApplicationSettingsException e) { -// log.error(e.getMessage()); -// throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); -// } catch (Exception e) { -// throw new GFacProviderException(e.getLocalizedMessage(), e); -// } -// } -// -// -// public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { -// if (!hpcType) { -// try { -// /* -// * Execute -// */ -// String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME; -// details.setJobDescription(executable); -// RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable); -// StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); -// log.info("Executing RawCommand : " + rawCommandInfo.getCommand()); -// CommandExecutor.executeCommand(rawCommandInfo, remoteCluster.getSession(), jobIDReaderCommandOutput); -// String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource"); -// log.info("stdout=" + stdOutputString); -// } catch (Exception e) { -// throw new GFacProviderException(e.getMessage(), e); -// } -// } else { -// try { -// StringBuffer data = new StringBuffer(); -// JobDetails jobDetails = new JobDetails(); -// String hostAddress = jobExecutionContext.getHostName(); -// LocalEventPublisher localEventPublisher = jobExecutionContext.getLocalEventPublisher(); -// try { -// RemoteCluster remoteCluster = null; -// if (jobExecutionContext.getSecurityContext(hostAddress) == null) { -// GFACSSHUtils.addSecurityContext(jobExecutionContext); -// } -// remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster(); -// if (remoteCluster == null) { -// throw new GFacProviderException("Security context is not set properly"); -// } else { -// log.info("Successfully retrieved the Security Context"); -// } -// // This installed path is a mandetory field, because this could change based on the computing resource -// JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, remoteCluster); -// jobDetails.setJobName(jobDescriptor.getJobName()); -// log.info(jobDescriptor.toXML()); -// jobDetails.setJobDescription(jobDescriptor.toXML()); -// String jobID = remoteCluster.submitBatchJob(jobDescriptor); -// if (jobID != null && !jobID.isEmpty()) { -// jobDetails.setJobID(jobID); -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); -// localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) -// , GfacExperimentState.JOBSUBMITTED)); -// jobExecutionContext.setJobDetails(jobDetails); -// if (verifyJobSubmissionByJobId(remoteCluster, jobID)) { -// localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) -// , GfacExperimentState.JOBSUBMITTED)); -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); -// } -// } else { -// jobExecutionContext.setJobDetails(jobDetails); -// int verificationTryCount = 0; -// while (verificationTryCount++ < 3) { -// String verifyJobId = verifyJobSubmission(remoteCluster, jobDetails); -// if (verifyJobId != null && !verifyJobId.isEmpty()) { -// // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED -// jobID = verifyJobId; -// jobDetails.setJobID(jobID); -// localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) -// , GfacExperimentState.JOBSUBMITTED)); -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED); -// break; -// } -// Thread.sleep(verificationTryCount * 1000); -// } -// } -// -// if (jobID == null || jobID.isEmpty()) { -// String msg = "expId:" + jobExecutionContext.getExperimentID() + " Couldn't find remote jobId for JobName:" -// + jobDetails.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed"; -// log.error(msg); -// GFacUtils.saveErrorDetails(jobExecutionContext, msg, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -// GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED); -// return; -// } -// data.append("jobDesc=").append(jobDescriptor.toXML()); -// data.append(",jobId=").append(jobDetails.getJobID()); -// monitor(jobExecutionContext); -// } catch (SSHApiException e) { -// String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); -// log.error(error); -// jobDetails.setJobID("none"); -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); -// GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -// throw new GFacProviderException(error, e); -// } catch (Exception e) { -// String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); -// log.error(error); -// jobDetails.setJobID("none"); -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); -// GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -// throw new GFacProviderException(error, e); -// } finally { -// log.info("Saving data for future recovery: "); -// log.info(data.toString()); -// GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName()); -// } -// } catch (GFacException e) { -// throw new GFacProviderException(e.getMessage(), e); -// } -// } -// } -// -// private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException { -// JobStatus status = remoteCluster.getJobStatus(jobID); -// return status != null && status != JobStatus.U; -// } -// -// private String verifyJobSubmission(RemoteCluster remoteCluster, JobDetails jobDetails) { -// String jobName = jobDetails.getJobName(); -// String jobId = null; -// try { -// jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName()); -// } catch (SSHApiException e) { -// log.error("Error while verifying JobId from JobName"); -// } -// return jobId; -// } -// -// public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { -// -// } -// -// public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { -// JobDetails jobDetails = jobExecutionContext.getJobDetails(); -// StringBuffer data = new StringBuffer(); -// String hostAddress = jobExecutionContext.getHostName(); -// if (!hpcType) { -// throw new NotImplementedException(); -// } else { -// RemoteCluster remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster(); -// if (remoteCluster == null) { -// throw new GFacProviderException("Security context is not set properly"); -// } else { -// log.info("Successfully retrieved the Security Context"); -// } -// // This installed path is a mandetory field, because this could change based on the computing resource -// if (jobDetails == null) { -// log.error("There is not JobDetails, Cancel request can't be performed !!!"); -// return false; -// } -// try { -// if (jobDetails.getJobID() != null) { -// if (remoteCluster.cancelJob(jobDetails.getJobID()) != null) { -// // if this operation success without any exceptions, we can assume cancel operation succeeded. -// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); -// return true; -// } else { -// log.info("Job Cancel operation failed"); -// } -// } else { -// log.error("No Job Id is set, so cannot perform the cancel operation !!!"); -// throw new GFacProviderException("Cancel request failed to cancel job as JobId is null in Job Execution Context"); -// } -// } catch (SSHApiException e) { -// String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); -// log.error(error); -// StringWriter errors = new StringWriter(); -// e.printStackTrace(new PrintWriter(errors)); -// GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -//// throw new GFacProviderException(error, e); -// } catch (Exception e) { -// String error = "Cancel request failed " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); -// log.error(error); -// StringWriter errors = new StringWriter(); -// e.printStackTrace(new PrintWriter(errors)); -// GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -//// throw new GFacProviderException(error, e); -// } -// return false; -// } -// } -// -// private File createShellScript(JobExecutionContext context) throws IOException { -// String uniqueDir = jobExecutionContext.getApplicationName() + System.currentTimeMillis() -// + new Random().nextLong(); -// -// File shellScript = File.createTempFile(uniqueDir, "sh"); -// OutputStream out = new FileOutputStream(shellScript); -// -// out.write("#!/bin/bash\n".getBytes()); -// out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes()); -// out.write(("export " + GFacConstants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes()); -// out.write(("export " + GFacConstants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n") -// .getBytes()); -// // get the env of the host and the application -// List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment(); -// for (SetEnvPaths setEnvPaths : envPathList) { -// log.debug("Env[" + setEnvPaths.getName() + "] = " + setEnvPaths.getValue()); -// out.write(("export " + setEnvPaths.getName() + "=" + setEnvPaths.getValue() + "\n").getBytes()); -// } -// -// // prepare the command -// final String SPACE = " "; -// StringBuffer cmd = new StringBuffer(); -// cmd.append(jobExecutionContext.getExecutablePath()); -// cmd.append(SPACE); -// -// MessageContext input = context.getInMessageContext(); -// Map<String, Object> inputs = input.getParameters(); -// Set<String> keys = inputs.keySet(); -// for (String paramName : keys) { -// InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); -// //if ("URIArray".equals(actualParameter.getType().getType().toString())) { -// if (inputParamType.getType() == DataType.URI) { -// String value = inputParamType.getValue(); -// cmd.append(value); -// cmd.append(SPACE); -// } else { -// String paramValue = inputParamType.getValue(); -// cmd.append(paramValue); -// cmd.append(SPACE); -// } -// } -// // We redirect the error and stdout to remote files, they will be read -// // in later -// cmd.append(SPACE); -// cmd.append("1>"); -// cmd.append(SPACE); -// cmd.append(jobExecutionContext.getStandardOutput()); -// cmd.append(SPACE); -// cmd.append("2>"); -// cmd.append(SPACE); -// cmd.append(jobExecutionContext.getStandardError()); -// -// String cmdStr = cmd.toString(); -// log.info("Command = " + cmdStr); -// out.write((cmdStr + "\n").getBytes()); -// String message = "\"execuationSuceeded\""; -// out.write(("echo " + message + "\n").getBytes()); -// out.close(); -// -// return shellScript; -// } -// -// public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { -// -// } -// -// /** -// * This method will read standard output and if there's any it will be parsed -// * -// * @param jobIDReaderCommandOutput -// * @param errorMsg -// * @return -// * @throws SSHApiException -// */ -// private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException { -// String stdOutputString = jobIDReaderCommandOutput.getStdOutputString(); -// String stdErrorString = jobIDReaderCommandOutput.getStdErrorString(); -// -// if (stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())) { -// log.error("Standard Error output : " + stdErrorString); -// throw new SSHApiException(errorMsg + stdErrorString); -// } -// return stdOutputString; -// } -// -// public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { -// // have to implement the logic to recover a gfac failure -// initialize(jobExecutionContext); -// if(hpcType) { -// log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID()); -// String hostName = jobExecutionContext.getHostName(); -// String jobId = ""; -// String jobDesc = ""; -// String jobName = ""; -// try { -// String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); -// String[] split = pluginData.split(","); -// if (split.length < 2) { -// this.execute(jobExecutionContext); -// return; -// } -// jobDesc = split[0].substring(8); -// jobId = split[1].substring(6); -// try { -// JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDesc); -// jobName = jobDescriptor.getJobName(); -// } catch (XmlException e) { -// log.error(e.getMessage(), e); -// log.error("Cannot parse plugin data stored, but trying to recover"); -// -// } -// log.info("Following data have recovered: "); -// log.info("Job Description: " + jobDesc); -// log.info("Job Id: " + jobId); -// if (jobName.isEmpty() || jobId.isEmpty() || "none".equals(jobId) || -// "".equals(jobId)) { -// log.info("Cannot recover data so submitting the job again !!!"); -// this.execute(jobExecutionContext); -// return; -// } -// } catch (Exception e) { -// log.error("Error while recovering provider", e); -// } -// try { -// // Now we are we have enough data to recover -// JobDetails jobDetails = new JobDetails(); -// jobDetails.setJobDescription(jobDesc); -// jobDetails.setJobID(jobId); -// jobDetails.setJobName(jobName); -// jobExecutionContext.setJobDetails(jobDetails); -// if (jobExecutionContext.getSecurityContext(hostName) == null) { -// try { -// GFACSSHUtils.addSecurityContext(jobExecutionContext); -// } catch (ApplicationSettingsException e) { -// log.error(e.getMessage()); -// throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); -// } -// } -// monitor(jobExecutionContext); -// } catch (Exception e) { -// log.error("Error while recover the job", e); -// throw new GFacProviderException("Error delegating already ran job to Monitoring", e); -// } -// }else{ -// log.info("We do not handle non hpc recovery so we simply run the Job directly"); -// this.execute(jobExecutionContext); -// } -// } -// -// @Override -// public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { -// if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { -// String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); -// SSHJobSubmission sshJobSubmission = null; -// try { -// sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); -// } catch (AppCatalogException e) { -// throw new GFacException("Error while reading compute resource", e); -// } -// MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); -// if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { -// try { -// EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor( -// sshJobSubmission.getResourceJobManager().getResourceJobManagerType()); -// emailBasedMonitor.addToJobMonitorMap(jobExecutionContext); -// } catch (AiravataException e) { -// throw new GFacHandlerException("Error while activating email job monitoring ", e); -// } -// return; -// } -// } else { -// throw new IllegalArgumentException("Monitoring is implemented only for SSH, " -// + jobExecutionContext.getPreferredJobSubmissionProtocol().name() + " is not yet implemented"); -// } -// -// } -//} http://git-wip-us.apache.org/repos/asf/airavata/blob/08cdad26/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java deleted file mode 100644 index 9532c53..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ /dev/null @@ -1,562 +0,0 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -//*/ -//package org.apache.airavata.gfac.ssh.util; -// -//import org.apache.airavata.gfac.core.cluster.RemoteCluster; -//import org.apache.airavata.gfac.impl.HPCRemoteCluster; -//import org.apache.airavata.registry.cpi.AppCatalog; -//import org.apache.airavata.registry.cpi.AppCatalogException; -//import org.apache.airavata.common.exception.ApplicationSettingsException; -//import org.apache.airavata.common.utils.ServerSettings; -//import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; -//import org.apache.airavata.gfac.core.GFacConstants; -//import org.apache.airavata.gfac.core.GFacException; -//import org.apache.airavata.gfac.core.RequestData; -//import org.apache.airavata.gfac.core.JobDescriptor; -//import org.apache.airavata.gfac.core.JobManagerConfiguration; -//import org.apache.airavata.gfac.core.cluster.ServerInfo; -//import org.apache.airavata.gfac.core.context.JobExecutionContext; -//import org.apache.airavata.gfac.core.context.MessageContext; -//import org.apache.airavata.gfac.core.handler.GFacHandlerException; -//import org.apache.airavata.gfac.core.GFacUtils; -//import org.apache.airavata.gfac.gsi.ssh.impl.GSISSHAbstractCluster; -//import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; -//import org.apache.airavata.gfac.gsi.ssh.util.CommonUtils; -//import org.apache.airavata.gfac.ssh.context.SSHAuthWrapper; -//import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; -//import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo; -//import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; -//import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; -//import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType; -//import org.apache.airavata.model.appcatalog.appinterface.DataType; -//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -//import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -//import org.apache.airavata.model.appcatalog.computeresource.*; -//import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -//import org.apache.airavata.model.experiment.ComputationalResourceScheduling; -//import org.apache.airavata.model.experiment.CorrectiveAction; -//import org.apache.airavata.model.experiment.ErrorCategory; -//import org.apache.airavata.model.experiment.TaskDetails; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.File; -//import java.io.PrintWriter; -//import java.io.StringWriter; -//import java.util.*; -// -//public class GFACSSHUtils { -// private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class); -// -// public static Map<String, List<RemoteCluster>> clusters = new HashMap<String, List<RemoteCluster>>(); -// -// public static final String PBS_JOB_MANAGER = "pbs"; -// public static final String SLURM_JOB_MANAGER = "slurm"; -// public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; -// public static final String LSF_JOB_MANAGER = "LSF"; -// -// public static int maxClusterCount = 5; -// -// /** -// * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext -// * @param jobExecutionContext -// * @throws GFacException -// * @throws ApplicationSettingsException -// */ -// public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { -// JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); -// JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); -// if (preferredJobSubmissionProtocol == JobSubmissionProtocol.GLOBUS || preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) { -// logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); -// } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { -// try { -// AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); -// SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); -// SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol(); -// if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) { -// SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); -// String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework -// RequestData requestData = new RequestData(jobExecutionContext.getGatewayID()); -// requestData.setTokenId(credentialStoreToken); -// -// ServerInfo serverInfo = new ServerInfo(null, jobExecutionContext.getHostName()); -// -// RemoteCluster pbsRemoteCluster = null; -// try { -// AuthenticationInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData); -// String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); -// if (installedParentPath == null) { -// installedParentPath = "/"; -// } -// -// SSHCredential credentials =((TokenizedSSHAuthInfo)tokenizedSSHAuthInfo).getCredentials();// this is just a call to get and set credentials in to this object,data will be used -// if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){ -// // now we fall back to username password authentication -// Properties configurationProperties = ServerSettings.getProperties(); -// tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(GFacConstants.SSH_PASSWORD)); -// } -// // This should be the login user name from compute resource preference -// String loginUser = jobExecutionContext.getLoginUserName(); -// if (loginUser == null) { -// loginUser = credentials.getPortalUserName(); -// } -// serverInfo.setUserName(loginUser); -// jobExecutionContext.getExperiment().setUserName(loginUser); -// -// -// // inside the pbsCluser object -// -// String key = loginUser + jobExecutionContext.getHostName() + serverInfo.getPort(); -// boolean recreate = false; -// synchronized (clusters) { -// if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { -// recreate = true; -// } else if (clusters.containsKey(key)) { -// int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; -// if (clusters.get(key).get(i).getSession().isConnected()) { -// pbsRemoteCluster = clusters.get(key).get(i); -// } else { -// clusters.get(key).remove(i); -// recreate = true; -// } -// if (!recreate) { -// try { -// pbsRemoteCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate -// } catch (Exception e) { -// clusters.get(key).remove(i); -// logger.info("Connection found the connection map is expired, so we create from the scratch"); -// maxClusterCount++; -// recreate = true; // we make the pbsRemoteCluster to create again if there is any exception druing connection -// } -// } -// logger.info("Re-using the same connection used with the connection string:" + key); -// } else { -// recreate = true; -// } -// if (recreate) { -// JobManagerConfiguration jConfig = null; -// String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); -// if (jobManager == null) { -// logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); -// jConfig = CommonUtils.getPBSJobManager(installedParentPath); -// } else { -// if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getPBSJobManager(installedParentPath); -// } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getSLURMJobManager(installedParentPath); -// } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getUGEJobManager(installedParentPath); -// } else if (LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getLSFJobManager(installedParentPath); -// } -// } -// -// pbsRemoteCluster = new HPCRemoteCluster(serverInfo, tokenizedSSHAuthInfo,jConfig); -// List<RemoteCluster> pbsRemoteClusters = null; -// if (!(clusters.containsKey(key))) { -// pbsRemoteClusters = new ArrayList<RemoteCluster>(); -// } else { -// pbsRemoteClusters = clusters.get(key); -// } -// pbsRemoteClusters.add(pbsRemoteCluster); -// clusters.put(key, pbsRemoteClusters); -// } -// } -// } catch (Exception e) { -// throw new GFacException("Error occurred...", e); -// } -// sshSecurityContext.setRemoteCluster(pbsRemoteCluster); -// jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), sshSecurityContext); -// } -// } catch (AppCatalogException e) { -// throw new GFacException("Error while getting SSH Submission object from app catalog", e); -// } -// } -// } -// -// /** -// * This method can be used to add third party resource security contexts -// * @param jobExecutionContext -// * @param sshAuth -// * @throws GFacException -// * @throws ApplicationSettingsException -// */ -// public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException { -// try { -// if(sshAuth== null) { -// throw new GFacException("Error adding security Context, because sshAuthWrapper is null"); -// } -// SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); -// AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); -// JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); -// SSHJobSubmission sshJobSubmission = null; -// try { -// sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); -// } catch (Exception e1) { -// logger.error("Not able to get SSHJobSubmission from registry"); -// } -// -// RemoteCluster pbsRemoteCluster = null; -// String key=sshAuth.getKey(); -// boolean recreate = false; -// synchronized (clusters) { -// if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { -// recreate = true; -// } else if (clusters.containsKey(key)) { -// int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; -// if (clusters.get(key).get(i).getSession().isConnected()) { -// pbsRemoteCluster = clusters.get(key).get(i); -// } else { -// clusters.get(key).remove(i); -// recreate = true; -// } -// if (!recreate) { -// try { -// pbsRemoteCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate -// } catch (Exception e) { -// clusters.get(key).remove(i); -// logger.info("Connection found the connection map is expired, so we create from the scratch"); -// maxClusterCount++; -// recreate = true; // we make the pbsRemoteCluster to create again if there is any exception druing connection -// } -// } -// logger.info("Re-using the same connection used with the connection string:" + key); -// } else { -// recreate = true; -// } -// if (recreate) { -// JobManagerConfiguration jConfig = null; -// String installedParentPath = null; -// if(jobExecutionContext.getResourceJobManager()!= null){ -// installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath(); -// } -// if (installedParentPath == null) { -// installedParentPath = "/"; -// } -// if (sshJobSubmission != null) { -// String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); -// if (jobManager == null) { -// logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); -// jConfig = CommonUtils.getPBSJobManager(installedParentPath); -// } else { -// if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getPBSJobManager(installedParentPath); -// } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getSLURMJobManager(installedParentPath); -// } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) { -// jConfig = CommonUtils.getUGEJobManager(installedParentPath); -// } else if (LSF_JOB_MANAGER.equals(jobManager)) { -// jConfig = CommonUtils.getLSFJobManager(installedParentPath); -// } -// } -// } -// pbsRemoteCluster = new HPCRemoteCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig); -// key = sshAuth.getKey(); -// List<RemoteCluster> pbsRemoteClusters = null; -// if (!(clusters.containsKey(key))) { -// pbsRemoteClusters = new ArrayList<RemoteCluster>(); -// } else { -// pbsRemoteClusters = clusters.get(key); -// } -// pbsRemoteClusters.add(pbsRemoteCluster); -// clusters.put(key, pbsRemoteClusters); -// } -// } -// sshSecurityContext.setRemoteCluster(pbsRemoteCluster); -// jobExecutionContext.addSecurityContext(key, sshSecurityContext); -// } catch (Exception e) { -// logger.error(e.getMessage(), e); -// throw new GFacException("Error adding security Context", e); -// } -// } -// -// -// public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, RemoteCluster remoteCluster) throws AppCatalogException, ApplicationSettingsException { -// JobDescriptor jobDescriptor = new JobDescriptor(); -// TaskDetails taskData = jobExecutionContext.getTaskData(); -// -// -// // set email based job monitoring email address if monitor mode is JOB_EMAIL_NOTIFICATION_MONITOR -// boolean addJobNotifMail = isEmailBasedJobMonitor(jobExecutionContext); -// String emailIds = null; -// if (addJobNotifMail) { -// emailIds = ServerSettings.getEmailBasedMonitorAddress(); -// } -// // add all configured job notification email addresses. -// if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) { -// String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS); -// if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) { -// flags = "ALL"; -// } -// jobDescriptor.setMailOptions(flags); -// -// String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS); -// if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) { -// if (emailIds != null && !emailIds.isEmpty()) { -// emailIds += ("," + userJobNotifEmailIds); -// } else { -// emailIds = userJobNotifEmailIds; -// } -// } -// -// if (taskData.isEnableEmailNotification()) { -// List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses(); -// String elist = GFacUtils.listToCsv(emailList, ','); -// if (elist != null && !elist.isEmpty()) { -// if (emailIds != null && !emailIds.isEmpty()) { -// emailIds = emailIds + "," + elist; -// } else { -// emailIds = elist; -// } -// } -// } -// } -// if (emailIds != null && !emailIds.isEmpty()) { -// logger.info("Email list: " + emailIds); -// jobDescriptor.setMailAddress(emailIds); -// } -// // this is common for any application descriptor -// -// jobDescriptor.setCallBackIp(ServerSettings.getIp()); -// jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); -// jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); -// jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); -// jobDescriptor.setExecutablePath(jobExecutionContext.getApplicationContext() -// .getApplicationDeploymentDescription().getExecutablePath()); -// jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); -// jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); -// String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount(); -// if (computationalProjectAccount == null){ -// ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference(); -// if (computeResourcePreference != null) { -// computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber(); -// } -// } -// if (computationalProjectAccount != null) { -// jobDescriptor.setAcountString(computationalProjectAccount); -// } -// // To make job name alpha numeric -// jobDescriptor.setJobName("A" + String.valueOf(generateJobName())); -// jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir()); -// -// List<String> inputValues = new ArrayList<String>(); -// MessageContext input = jobExecutionContext.getInMessageContext(); -// -// // sort the inputs first and then build the command ListR -// Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() { -// @Override -// public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { -// return inputDataObjectType.getInputOrder() - t1.getInputOrder(); -// } -// }; -// Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator); -// for (Object object : input.getParameters().values()) { -// if (object instanceof InputDataObjectType) { -// InputDataObjectType inputDOT = (InputDataObjectType) object; -// sortedInputSet.add(inputDOT); -// } -// } -// for (InputDataObjectType inputDataObjectType : sortedInputSet) { -// if (!inputDataObjectType.isRequiredToAddedToCommandLine()) { -// continue; -// } -// if (inputDataObjectType.getApplicationArgument() != null -// && !inputDataObjectType.getApplicationArgument().equals("")) { -// inputValues.add(inputDataObjectType.getApplicationArgument()); -// } -// -// if (inputDataObjectType.getValue() != null -// && !inputDataObjectType.getValue().equals("")) { -// if (inputDataObjectType.getType() == DataType.URI) { -// // set only the relative path -// String filePath = inputDataObjectType.getValue(); -// filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); -// inputValues.add(filePath); -// }else { -// inputValues.add(inputDataObjectType.getValue()); -// } -// -// } -// } -// Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters(); -// for (Object outputParam : outputParams.values()) { -// if (outputParam instanceof OutputDataObjectType) { -// OutputDataObjectType output = (OutputDataObjectType) outputParam; -// if (output.getApplicationArgument() != null -// && !output.getApplicationArgument().equals("")) { -// inputValues.add(output.getApplicationArgument()); -// } -// if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) { -// if (output.getType() == DataType.URI){ -// String filePath = output.getValue(); -// filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length()); -// inputValues.add(filePath); -// } -// } -// } -// } -// -// jobDescriptor.setInputValues(inputValues); -// jobDescriptor.setUserName(((GSISSHAbstractCluster) remoteCluster).getServerInfo().getUserName()); -// jobDescriptor.setShellName("/bin/bash"); -// jobDescriptor.setAllEnvExport(true); -// jobDescriptor.setOwner(((HPCRemoteCluster) remoteCluster).getServerInfo().getUserName()); -// -// ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager(); -// -// -// ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling(); -// if (taskScheduling != null) { -// int totalNodeCount = taskScheduling.getNodeCount(); -// int totalCPUCount = taskScheduling.getTotalCPUCount(); -// -// -// if (taskScheduling.getComputationalProjectAccount() != null) { -// jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); -// } -// if (taskScheduling.getQueueName() != null) { -// jobDescriptor.setQueueName(taskScheduling.getQueueName()); -// } -// -// if (totalNodeCount > 0) { -// jobDescriptor.setNodes(totalNodeCount); -// } -// if (taskScheduling.getComputationalProjectAccount() != null) { -// jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); -// } -// if (taskScheduling.getQueueName() != null) { -// jobDescriptor.setQueueName(taskScheduling.getQueueName()); -// } -// if (totalCPUCount > 0) { -// int ppn = totalCPUCount / totalNodeCount; -// jobDescriptor.setProcessesPerNode(ppn); -// jobDescriptor.setCPUCount(totalCPUCount); -// } -// if (taskScheduling.getWallTimeLimit() > 0) { -// jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit())); -// if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){ -// jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit())); -// } -// } -// if (taskScheduling.getTotalPhysicalMemory() > 0) { -// jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + ""); -// } -// } else { -// logger.error("Task scheduling cannot be null at this point.."); -// } -// ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); -// List<String> moduleCmds = appDepDescription.getModuleLoadCmds(); -// if (moduleCmds != null) { -// for (String moduleCmd : moduleCmds) { -// jobDescriptor.addModuleLoadCommands(moduleCmd); -// } -// } -// List<String> preJobCommands = appDepDescription.getPreJobCommands(); -// if (preJobCommands != null) { -// for (String preJobCommand : preJobCommands) { -// jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext)); -// } -// } -// -// List<String> postJobCommands = appDepDescription.getPostJobCommands(); -// if (postJobCommands != null) { -// for (String postJobCommand : postJobCommands) { -// jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext)); -// } -// } -// -// ApplicationParallelismType parallelism = appDepDescription.getParallelism(); -// if (parallelism != null){ -// if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){ -// Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands(); -// if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) { -// for (JobManagerCommand command : jobManagerCommands.keySet()) { -// if (command == JobManagerCommand.SUBMISSION) { -// String commandVal = jobManagerCommands.get(command); -// jobDescriptor.setJobSubmitter(commandVal); -// } -// } -// } -// } -// } -// return jobDescriptor; -// } -// -// public static boolean isEmailBasedJobMonitor(JobExecutionContext jobExecutionContext) throws AppCatalogException { -// if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { -// String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId(); -// SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId); -// MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); -// return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR; -// } else { -// return false; -// } -// } -// -// private static int generateJobName() { -// Random random = new Random(); -// int i = random.nextInt(Integer.MAX_VALUE); -// i = i + 99999999; -// if(i<0) { -// i = i * (-1); -// } -// return i; -// } -// -// private static String parseCommand(String value, JobExecutionContext jobExecutionContext) { -// String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir()); -// parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir()); -// parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir()); -// return parsedValue; -// } -// /** -// * This method can be used to set the Security Context if its not set and later use it in other places -// * @param jobExecutionContext -// * @param authenticationInfo -// * @param userName -// * @param hostName -// * @param port -// * @return -// * @throws GFacException -// */ -// public static String prepareSecurityContext(JobExecutionContext jobExecutionContext, AuthenticationInfo authenticationInfo -// , String userName, String hostName, int port) throws GFacException { -// ServerInfo serverInfo = new ServerInfo(userName, hostName); -// String key = userName+hostName+port; -// SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key); -// if (jobExecutionContext.getSecurityContext(key) == null) { -// try { -// GFACSSHUtils.addSecurityContext(jobExecutionContext, sshAuthWrapper); -// } catch (ApplicationSettingsException e) { -// logger.error(e.getMessage()); -// try { -// StringWriter errors = new StringWriter(); -// e.printStackTrace(new PrintWriter(errors)); -// GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); -// } catch (GFacException e1) { -// logger.error(e1.getLocalizedMessage()); -// } -// throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); -// } -// } -// return key; -// } -//} http://git-wip-us.apache.org/repos/asf/airavata/blob/08cdad26/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java deleted file mode 100644 index cadb251..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/HandleOutputs.java +++ /dev/null @@ -1,96 +0,0 @@ -//package org.apache.airavata.gfac.ssh.util; -// -//import java.io.File; -//import java.util.ArrayList; -//import java.util.Arrays; -//import java.util.List; -// -//import org.apache.airavata.gfac.core.context.JobExecutionContext; -//import org.apache.airavata.gfac.core.handler.GFacHandlerException; -//import org.apache.airavata.gfac.core.GFacUtils; -//import org.apache.airavata.gfac.core.cluster.RemoteCluster; -//import org.apache.airavata.model.appcatalog.appinterface.DataType; -//import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -///** -// * To handle outputs of different data types -// * -// */ -//public class HandleOutputs { -// private static final Logger log = LoggerFactory.getLogger(HandleOutputs.class); -// -// public static List<OutputDataObjectType> handleOutputs(JobExecutionContext jobExecutionContext, RemoteCluster remoteCluster) throws GFacHandlerException { -// List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>(); -// try { -// String outputDataDir = File.separator + "tmp" + File.separator + jobExecutionContext.getExperimentID(); -// (new File(outputDataDir)).mkdirs(); -// -// List<OutputDataObjectType> outputs = jobExecutionContext.getTaskData().getApplicationOutputs(); -// List<String> outputList = remoteCluster.listDirectory(jobExecutionContext.getWorkingDir()); -// boolean missingOutput = false; -// -// for (OutputDataObjectType output : outputs) { -// // FIXME: Validation of outputs based on required and optional and search based on REGEX provided in search. -// -// if (DataType.URI == output.getType()) { -// // for failed jobs outputs are not generated. So we should not download outputs -// if (GFacUtils.isFailedJob(jobExecutionContext)){ -// continue; -// } -// String outputFile = output.getValue(); -// String fileName = outputFile.substring(outputFile.lastIndexOf(File.separatorChar) + 1, outputFile.length()); -// -// if (output.getLocation() == null && !outputList.contains(fileName) && output.isIsRequired()) { -// missingOutput = true; -// } else { -// remoteCluster.scpFrom(outputFile, outputDataDir); -// String localFile = outputDataDir + File.separator + fileName; -// jobExecutionContext.addOutputFile(localFile); -// output.setValue(localFile); -// outputArray.add(output); -// } -// -// } else if (DataType.STDOUT == output.getType()) { -// String downloadFile = jobExecutionContext.getStandardOutput(); -// String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); -// remoteCluster.scpFrom(downloadFile, outputDataDir); -// String localFile = outputDataDir + File.separator + fileName; -// jobExecutionContext.addOutputFile(localFile); -// jobExecutionContext.setStandardOutput(localFile); -// output.setValue(localFile); -// outputArray.add(output); -// -// } else if (DataType.STDERR == output.getType()) { -// String downloadFile = jobExecutionContext.getStandardError(); -// String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); -// remoteCluster.scpFrom(downloadFile, outputDataDir); -// String localFile = outputDataDir + File.separator + fileName; -// jobExecutionContext.addOutputFile(localFile); -// jobExecutionContext.setStandardError(localFile); -// output.setValue(localFile); -// outputArray.add(output); -// -// } -// } -// if (outputArray == null || outputArray.isEmpty()) { -// log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names"); -// if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) { -// throw new GFacHandlerException("Empty Output returned from the Application, Double check the application" -// + "and ApplicationDescriptor output Parameter Names"); -// } -// } -// -// if (missingOutput) { -// String arrayString = Arrays.deepToString(outputArray.toArray()); -// log.error(arrayString); -// throw new GFacHandlerException("Required output is missing"); -// } -// } catch (Exception e) { -// throw new GFacHandlerException(e); -// } -// jobExecutionContext.getTaskData().setApplicationOutputs(outputArray); -// return outputArray; -// } -//}
