Repository: airavata Updated Branches: refs/heads/master 08cdad264 -> c62f74a5e
Added LocalRemoteCluster implementation and LocalCommandOutput reader Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c62f74a5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c62f74a5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c62f74a5 Branch: refs/heads/master Commit: c62f74a5ecaa9866f7337a3c25009dcf8ee52f35 Parents: 08cdad2 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Nov 9 13:14:06 2015 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Nov 9 13:14:06 2015 -0500 ---------------------------------------------------------------------- .../gfac/core/cluster/RemoteCluster.java | 4 +- .../airavata/gfac/impl/GFacEngineImpl.java | 26 +-- .../airavata/gfac/impl/HPCRemoteCluster.java | 6 +- .../airavata/gfac/impl/LocalCommandOutput.java | 60 ++++++ .../airavata/gfac/impl/LocalRemoteCluster.java | 182 +++++++++++++++++++ .../gfac/impl/task/SCPDataStageTask.java | 4 +- 6 files changed, 257 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java index 59e7ff5..9e4544c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java @@ -51,7 +51,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file location, this can be a directory too * @throws SSHApiException throws exception during error */ - public void scpTo(String localFile, String remoteFile) throws SSHApiException; + public void copyTo(String localFile, String remoteFile) throws SSHApiException; /** * This will copy a remote file in path rFile to local file lFile @@ -59,7 +59,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file path, this has to be a full qualified path * @param localFile This is the local file to copy, this can be a directory too */ - public void scpFrom(String remoteFile, String localFile) throws SSHApiException; + public void copyFrom(String remoteFile, String localFile) throws SSHApiException; /** * This wil copy source remote file to target remote file. http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 4b67ffd..6444eb4 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -343,21 +343,7 @@ public class GFacEngineImpl implements GFacEngine { taskContext.setTaskStatus(taskStatus); GFacUtils.saveAndPublishTaskStatus(taskContext); - if (taskStatus.getState() == TaskState.FAILED) { - log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " + - "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext - .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType - ().name(), taskStatus.getReason()); - String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ") - .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) - .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Job submission task failed. Reason: ") - .append(taskStatus.getReason()).toString(); - ErrorModel errorModel = new ErrorModel(); - errorModel.setUserFriendlyMessage("Job submission task failed"); - errorModel.setActualErrorMessage(errorMsg); - GFacUtils.saveTaskError(taskContext, errorModel); - throw new GFacException("Job submission task failed"); - } + checkFailures(taskContext, taskStatus, jobSubmissionTask); return false; } catch (TException e) { throw new GFacException(e); @@ -419,13 +405,18 @@ public class GFacEngineImpl implements GFacEngine { taskContext.setTaskStatus(taskStatus); GFacUtils.saveAndPublishTaskStatus(taskContext); + checkFailures(taskContext, taskStatus, dMoveTask); + return false; + } + + private void checkFailures(TaskContext taskContext, TaskStatus taskStatus, Task dMoveTask) throws GFacException { if (taskStatus.getState() == TaskState.FAILED) { log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType ().name(), taskStatus.getReason()); - String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ") - .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) + String errorMsg = new StringBuilder("expId: ").append(taskContext.getParentProcessContext().getExperimentId()).append(", processId: ") + .append(taskContext.getParentProcessContext().getProcessId()).append(", taskId: ").append(taskContext.getTaskId()) .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ") .append(taskStatus.getReason()).toString(); ErrorModel errorModel = new ErrorModel(); @@ -434,7 +425,6 @@ public class GFacEngineImpl implements GFacEngine { GFacUtils.saveTaskError(taskContext, errorModel); throw new GFacException("Error while staging input data"); } - return false; } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index 288c98c..3711f7c 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -83,7 +83,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ @Override public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException { JobSubmissionOutput jsoutput = new JobSubmissionOutput(); - scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory + copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath); submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand()); StandardOutReader reader = new StandardOutReader(); @@ -97,7 +97,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void scpTo(String localFile, String remoteFile) throws SSHApiException { + public void copyTo(String localFile, String remoteFile) throws SSHApiException { int retry = 3; while (retry > 0) { try { @@ -128,7 +128,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void scpFrom(String remoteFile, String localFile) throws SSHApiException { + public void copyFrom(String remoteFile, String localFile) throws SSHApiException { int retry = 3; while(retry>0) { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java new file mode 100644 index 0000000..e9d683d --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java @@ -0,0 +1,60 @@ +package org.apache.airavata.gfac.impl; + +import com.jcraft.jsch.Channel; +import org.apache.airavata.gfac.core.cluster.CommandOutput; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; + +/** + * Created by syodage on 11/9/15. + */ +public class LocalCommandOutput implements CommandOutput { + private Process process; + + @Override + public void onOutput(Channel channel) { + + } + + public void readOutputs(Process process) { + this.process = process; + } + + public String getStandardOut() throws IOException { + BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream())); + StringBuilder sb = new StringBuilder(); + String s = null; + while ((s = stdInput.readLine()) != null) { + sb.append(s); + } + return sb.toString(); + } + + public String getStandardErrorString() throws IOException { + BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream())); + StringBuilder sb = new StringBuilder(); + String s = null; + while ((s = stdError.readLine()) != null) { + sb.append(s); + } + return sb.toString(); + } + + @Override + public OutputStream getStandardError() { + return null; + } + + @Override + public void exitCode(int code) { + + } + + @Override + public int getExitCode() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java new file mode 100644 index 0000000..d1aa6e0 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java @@ -0,0 +1,182 @@ +package org.apache.airavata.gfac.impl; + +import com.jcraft.jsch.Session; +import org.apache.airavata.gfac.core.JobManagerConfiguration; +import org.apache.airavata.gfac.core.SSHApiException; +import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +import org.apache.airavata.gfac.core.cluster.*; +import org.apache.airavata.model.status.JobStatus; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.*; + +/** + * Created by shameera on 11/9/15. + */ +public class LocalRemoteCluster extends AbstractRemoteCluster { + + public LocalRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo authenticationInfo) { + super(serverInfo, jobManagerConfiguration, authenticationInfo); + } + + @Override + public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException { + try { + JobSubmissionOutput jsoutput = new JobSubmissionOutput(); + copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory + RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath); + submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand()); + LocalCommandOutput localCommandOutput = new LocalCommandOutput(); + executeCommand(submitCommand, localCommandOutput); + jsoutput.setJobId(outputParser.parseJobSubmission(localCommandOutput.getStandardOut())); + jsoutput.setExitCode(localCommandOutput.getExitCode()); + jsoutput.setStdOut(localCommandOutput.getStandardOut()); + jsoutput.setStdErr(localCommandOutput.getStandardErrorString()); + return jsoutput; + } catch (IOException e) { + throw new SSHApiException("Error while submitting local batch job", e); + } + } + + @Override + public void copyTo(String localFile, String remoteFile) throws SSHApiException { + Path sourcePath = Paths.get(localFile); + Path targetPath = Paths.get(remoteFile); + try { + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString() + + ", to destinationFile: " + targetPath.toString(), e); + } + } + + @Override + public void copyFrom(String remoteFile, String localFile) throws SSHApiException { + Path sourcePath = Paths.get(remoteFile); + Path targetPath = Paths.get(localFile); + try { + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString() + + ", to destinationFile: " + targetPath.toString(), e); + } + } + + @Override + public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut) throws SSHApiException { + throw new UnsupportedOperationException("Scp third party is not support with LocalRemoteCluster"); + } + + @Override + public void makeDirectory(String directoryPath) throws SSHApiException { + Path dirPath = Paths.get(directoryPath); + Set<PosixFilePermission> perms = new HashSet<>(); + // add permission as rw-r--r-- 644 + perms.add(PosixFilePermission.OWNER_WRITE); + perms.add(PosixFilePermission.OWNER_READ); + perms.add(PosixFilePermission.GROUP_READ); + perms.add(PosixFilePermission.OTHERS_READ); + FileAttribute<Set<PosixFilePermission>> fileAttributes = PosixFilePermissions.asFileAttribute(perms); + try { + Files.createDirectory(dirPath, fileAttributes); + } catch (IOException e) { + throw new SSHApiException("Error making directory", e); + } + + } + + @Override + public JobStatus cancelJob(String jobID) throws SSHApiException { + JobStatus oldStatus = getJobStatus(jobID); + RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobID); + execute(cancelCommand); + return oldStatus; + } + + + @Override + public JobStatus getJobStatus(String jobID) throws SSHApiException { + RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobID); + LocalCommandOutput localCommandOutput = new LocalCommandOutput(); + try { + executeCommand(monitorCommand, localCommandOutput); + return outputParser.parseJobStatus(jobID, localCommandOutput.getStandardErrorString()); + } catch (IOException e) { + throw new SSHApiException("Error while getting jobStatus", e); + } + } + + @Override + public String getJobIdByJobName(String jobName, String userName) throws SSHApiException { + try { + RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName); + LocalCommandOutput localCommandOutput = new LocalCommandOutput(); + executeCommand(jobIdMonitorCommand, localCommandOutput); + return outputParser.parseJobId(jobName, localCommandOutput.getStandardOut()); + } catch (IOException e) { + throw new SSHApiException("Error while getting jobId using JobName", e); + } + } + + @Override + public void getJobStatuses(String userName, Map<String, JobStatus> jobStatusMap) throws SSHApiException { + try { + RawCommandInfo userBasedMonitorCommand = jobManagerConfiguration.getUserBasedMonitorCommand(userName); + LocalCommandOutput localCommandOutput = new LocalCommandOutput(); + executeCommand(userBasedMonitorCommand, localCommandOutput); + outputParser.parseJobStatuses(userName, jobStatusMap, localCommandOutput.getStandardOut()); + } catch (IOException e) { + throw new SSHApiException("Error while getting job statuses", e); + } + } + + @Override + public List<String> listDirectory(String directoryPath) throws SSHApiException { + File directory = new File(directoryPath); + List<String> results = new ArrayList<>(); + File[] files = directory.listFiles(); + for (File file : files) { + results.add(file.getName()); + } + return results; + } + + @Override + public boolean execute(CommandInfo commandInfo) throws SSHApiException { + LocalCommandOutput localCommandOutput = new LocalCommandOutput(); + try { + executeCommand(commandInfo, localCommandOutput); + } catch (IOException e) { + throw new SSHApiException("Error while executing command " + commandInfo.getCommand(), e); + } + return true; + } + + private void executeCommand(CommandInfo commandInfo, LocalCommandOutput localCommandOutput) throws IOException { + Process process = Runtime.getRuntime().exec(commandInfo.getCommand()); + localCommandOutput.readOutputs(process); + } + + @Override + public Session getSession() throws SSHApiException { + return null; + } + + @Override + public void disconnect() throws SSHApiException { + + } + + @Override + public ServerInfo getServerInfo() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 9ffa6f2..32ee31b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -66,13 +66,13 @@ public class SCPDataStageTask implements Task { /** * copy local file to compute resource. */ - taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI + taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI .getPath()); } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { /** * copy remote file from compute resource. */ - taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI + taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI .getPath()); } status.setReason("Successfully staged data");
