Repository: airavata Updated Branches: refs/heads/develop db027a53e -> fe6ebe9c0
Reverted scp refactoring Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dbcfd77b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dbcfd77b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dbcfd77b Branch: refs/heads/develop Commit: dbcfd77be3abd26f97a185f65649cd3ea1ac46c0 Parents: 4a92da4 Author: Shameera Rathnayaka <[email protected]> Authored: Wed Nov 2 17:24:27 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Wed Nov 2 17:24:27 2016 -0400 ---------------------------------------------------------------------- .../gfac/core/cluster/RemoteCluster.java | 6 +- .../airavata/gfac/impl/BESRemoteCluster.java | 6 +- .../airavata/gfac/impl/HPCRemoteCluster.java | 19 +- .../airavata/gfac/impl/LocalRemoteCluster.java | 9 +- .../org/apache/airavata/gfac/impl/SSHUtils.java | 198 +++++++++---------- .../airavata/gfac/impl/task/ArchiveTask.java | 9 +- .../gfac/impl/task/BESJobSubmissionTask.java | 5 +- .../airavata/gfac/impl/task/DataStageTask.java | 13 +- .../gfac/impl/task/EnvironmentSetupTask.java | 4 +- .../gfac/impl/task/SCPDataStageTask.java | 44 +++-- .../gfac/impl/task/utils/StreamData.java | 43 ++-- 11 files changed, 178 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java index f59a9e3..5f8d0ec 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java @@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file location, this can be a directory too * @throws SSHApiException throws exception during error */ - void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException; + void copyTo(String localFile, String remoteFile) throws SSHApiException; /** * This will copy a remote file in path rFile to local file lFile @@ -60,7 +60,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file path, this has to be a full qualified path * @param localFile This is the local file to copy, this can be a directory too */ - void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException; + void copyFrom(String remoteFile, String localFile) throws SSHApiException; /** * This wil copy source remote file to target remote file. @@ -77,7 +77,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param directoryPath the full qualified path for the directory user wants to create * @throws SSHApiException throws during error */ - void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException; + void makeDirectory(String directoryPath) throws SSHApiException; /** * This will delete the given job from the queue http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java index 828a34e..0f517b5 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java @@ -43,12 +43,12 @@ public class BESRemoteCluster extends AbstractRemoteCluster{ } @Override - public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyTo(String localFile, String remoteFile) throws SSHApiException { } @Override - public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyFrom(String remoteFile, String localFile) throws SSHApiException { } @@ -57,7 +57,7 @@ public class BESRemoteCluster extends AbstractRemoteCluster{ } @Override - public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void makeDirectory(String directoryPath) throws SSHApiException { } http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index b322cef..c3566b8 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -86,8 +86,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ @Override public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException { JobSubmissionOutput jsoutput = new JobSubmissionOutput(); - copyTo(jobScriptFilePath, workingDirectory, - session -> SSHUtils.scpTo(jobScriptFilePath, workingDirectory, session)); // scp script file to working directory + copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath); submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand()); StandardOutReader reader = new StandardOutReader(); @@ -113,13 +112,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyTo(String localFile, String remoteFile) throws SSHApiException { int retry = 3; while (retry > 0) { try { session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring localhost:" + localFile + " to " + serverInfo.getHost() + ":" + remoteFile); - sessionConsumer.consume(session); + SSHUtils.scpTo(localFile, remoteFile, session); retry = 0; } catch (Exception e) { retry--; @@ -140,13 +139,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyFrom(String remoteFile, String localFile) throws SSHApiException { int retry = 3; while(retry>0) { try { session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:" + localFile); - sessionConsumer.consume(session); + SSHUtils.scpFrom(remoteFile, localFile, session); retry=0; } catch (Exception e) { retry--; @@ -191,7 +190,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void makeDirectory(String directoryPath) throws SSHApiException { int retryCount = 0; try { while (retryCount < MAX_RETRY_COUNT) { @@ -199,9 +198,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath); try { - sessionConsumer.consume(session); + SSHUtils.makeDirectory(directoryPath, session); break; // Exit while loop - } catch (DataStagingException e) { + } catch (JSchException e) { if (retryCount == MAX_RETRY_COUNT) { log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for creating directory: " + serverInfo.getHost() + ":" + directoryPath, e); @@ -211,7 +210,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ log.error("Issue with jsch, Retry creating directory: " + serverInfo.getHost() + ":" + directoryPath); } } - } catch (AiravataException | DataStagingException e) { + } catch (JSchException | AiravataException | IOException e) { throw new SSHApiException("Failed to create directory " + serverInfo.getHost() + ":" + directoryPath, e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java index aa79a0c..d5422d2 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java @@ -60,8 +60,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException { try { JobSubmissionOutput jsoutput = new JobSubmissionOutput(); - String remoteFile = workingDirectory + File.separator + new File(jobScriptFilePath).getName(); - copyTo(jobScriptFilePath, remoteFile, session -> SSHUtils.scpTo(jobScriptFilePath, remoteFile, session)); // scp script file to working directory + copyTo(jobScriptFilePath, workingDirectory + File.separator + new File(jobScriptFilePath).getName()); // scp script file to working directory RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath); submitCommand.setRawCommand(submitCommand.getRawCommand()); LocalCommandOutput localCommandOutput = new LocalCommandOutput(); @@ -77,7 +76,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { } @Override - public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyTo(String localFile, String remoteFile) throws SSHApiException { Path sourcePath = Paths.get(localFile); Path targetPath = Paths.get(remoteFile); try { @@ -89,7 +88,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { } @Override - public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void copyFrom(String remoteFile, String localFile) throws SSHApiException { Path sourcePath = Paths.get(remoteFile); Path targetPath = Paths.get(localFile); try { @@ -126,7 +125,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { } @Override - public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void makeDirectory(String directoryPath) throws SSHApiException { Path dirPath = Paths.get(directoryPath); Set<PosixFilePermission> perms = new HashSet<>(); // add permission as rwxr--r-- 744 http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java index 07e1799..cd5651e 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java @@ -24,7 +24,6 @@ import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.SSHApiException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,100 +53,97 @@ public class SSHUtils { * @param localFile Local file to transfer, this can be a directory * @return returns the final remote file path, so that users can use the new file location */ - public static String scpTo(String localFile, String remoteFile, Session session) throws DataStagingException { + public static String scpTo(String localFile, String remoteFile, Session session) throws IOException, + JSchException, SSHApiException { FileInputStream fis = null; String prefix = null; if (new File(localFile).isDirectory()) { prefix = localFile + File.separator; } boolean ptimestamp = true; - Channel channel = null; + // exec 'scp -t rfile' remotely String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; - try { - channel = session.openChannel("exec"); + Channel channel = session.openChannel("exec"); - StandardOutReader stdOutReader = new StandardOutReader(); - ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); - ((ChannelExec) channel).setCommand(command); + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + ((ChannelExec) channel).setCommand(command); - // get I/O streams for remote scp - try (OutputStream out = channel.getOutputStream(); - InputStream in = channel.getInputStream()) { + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); - channel.connect(); - if (checkAck(in) != 0) { - String error = "Error Reading input Stream"; - log.error(error); - throw new DataStagingException(error); - } + channel.connect(); - File _lfile = new File(localFile); - - if (ptimestamp) { - command = "T" + (_lfile.lastModified() / 1000) + " 0"; - // The access time should be sent here, - // but it is not accessible with JavaAPI ;-< - command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); - out.write(command.getBytes()); - out.flush(); - if (checkAck(in) != 0) { - String error = "Error Reading input Stream"; - log.error(error); - throw new DataStagingException(error); - } - } + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } - // send "C0644 filesize filename", where filename should not include '/' - long filesize = _lfile.length(); - command = "C0644 " + filesize + " "; - if (localFile.lastIndexOf('/') > 0) { - command += localFile.substring(localFile.lastIndexOf('/') + 1); - } else { - command += localFile; - } - command += "\n"; - out.write(command.getBytes()); - out.flush(); - if (checkAck(in) != 0) { - String error = "Error Reading input Stream"; - log.error(error); - throw new DataStagingException(error); - } + File _lfile = new File(localFile); - // send a content of localFile - fis = new FileInputStream(localFile); - byte[] buf = new byte[1024]; - while (true) { - int len = fis.read(buf, 0, buf.length); - if (len <= 0) break; - out.write(buf, 0, len); //out.flush(); - } - fis.close(); - fis = null; - // send '\0' - buf[0] = 0; - out.write(buf, 0, 1); - out.flush(); - if (checkAck(in) != 0) { - String error = "Error Reading input Stream"; - log.error(error); - throw new DataStagingException(error); - } - } - stdOutReader.onOutput(channel); - if (stdOutReader.getStdErrorString().contains("scp:")) { - throw new DataStagingException(stdOutReader.getStdErrorString()); - } - //since remote file is always a file we just return the file - return remoteFile; - } catch (IOException | JSchException e) { - throw new DataStagingException(e); - } finally { - if (channel != null && channel.isConnected()) { - channel.disconnect(); + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); } } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (localFile.lastIndexOf('/') > 0) { + command += localFile.substring(localFile.lastIndexOf('/') + 1); + } else { + command += localFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of localFile + fis = new FileInputStream(localFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + stdOutReader.onOutput(channel); + + + channel.disconnect(); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + //since remote file is always a file we just return the file + return remoteFile; } /** @@ -157,7 +153,8 @@ public class SSHUtils { * @param localFile This is the local file to copy, this can be a directory too * @return returns the final local file path of the new file came from the remote resource */ - public static void scpFrom(String remoteFile, String localFile, Session session) throws DataStagingException { + public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, + JSchException, SSHApiException { FileOutputStream fos = null; try { String prefix = null; @@ -258,7 +255,6 @@ public class SSHUtils { } catch (Exception e) { log.error(e.getMessage(), e); - throw new DataStagingException(e); } finally { try { if (fos != null) fos.close(); @@ -276,11 +272,8 @@ public class SSHUtils { * @param destinationSession JSch Session for target * @return returns the final local file path of the new file came from the remote resource */ - public static void scpThirdParty(String sourceFile, - Session sourceSession, - String destinationFile, - Session destinationSession, - boolean ignoreEmptyFile) throws DataStagingException { + public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession, boolean ignoreEmptyFile) throws + IOException, JSchException { OutputStream sout = null; InputStream sin = null; OutputStream dout = null; @@ -408,7 +401,7 @@ public class SSHUtils { } catch (Exception e) { log.error(e.getMessage(), e); - throw new DataStagingException(e.getMessage()); + throw new JSchException(e.getMessage()); } finally { try { if (dout != null) dout.close(); @@ -433,32 +426,35 @@ public class SSHUtils { } } - public static void makeDirectory(String path, Session session) throws DataStagingException { + public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException { - Channel channel = null; + // exec 'scp -t rfile' remotely String command = "mkdir -p " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); try { - // exec 'scp -t rfile' remotely - channel = session.openChannel("exec"); - StandardOutReader stdOutReader = new StandardOutReader(); channel.connect(); - stdOutReader.onOutput(channel); - if (stdOutReader.getStdErrorString().contains("mkdir:")) { - throw new DataStagingException(stdOutReader.getStdErrorString()); - } - } catch (JSchException e) { + channel.disconnect(); // session.disconnect(); log.error("Unable to retrieve command output. Command - " + command + " on server - " + session.getHost() + ":" + session.getPort() + " connecting user name - " + session.getUserName()); - throw new DataStagingException(e); - }finally { - if(channel != null && channel.isConnected()) - channel.disconnect(); + throw e; + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("mkdir:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); } + + channel.disconnect(); } public static List<String> listDirectory(String path, Session session) throws IOException, JSchException, http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java index 85d3fd2..88661f8 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java @@ -138,8 +138,13 @@ public class ArchiveTask implements Task { // move tar to storage resource remoteCluster.execute(commandInfo); URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar); - remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath, destinationURI.getPath(), - session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true)); + remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath ,destinationURI.getPath(), session -> { + try { + SSHUtils.scpThirdParty(sourceURI.getPath(),session, destinationURI.getPath(), sshSession, true); + } catch (Exception e) { + throw new DataStagingException("Error while transferring " + sourceURI.getPath() + " to " + destinationURI.getPath()); + } + }); // delete tar in remote computer resource commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath); http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java index 1811286..990b9ea 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java @@ -31,7 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities; import eu.unicore.util.httpclient.DefaultClientConfiguration; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.SSHApiException; @@ -274,7 +273,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { break; } } - } catch (DataStagingException | AiravataException | URISyntaxException e) { + } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) { log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e); throw new GFacException("Error while scp output files to remote storage file location", e); } @@ -318,7 +317,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { input.setValue("file:/" + localFilePath); } } - } catch ( AiravataException | DataStagingException| URISyntaxException e) { + } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) { log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e); throw new GFacException("Error while scp input files to local file location", e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java index a171f7f..8c6a125 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java @@ -24,7 +24,6 @@ import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.SSHUtils; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.TaskState; @@ -66,18 +65,14 @@ public class DataStageTask implements Task { /** * copy local file to compute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo( - sourceURI.getPath(), - destinationURI.getPath(), - session -> SSHUtils.scpTo(sourceURI.getPath(), destinationURI.getPath(), session)); + taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI + .getPath()); } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { /** * copy remote file from compute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom( - sourceURI.getPath(), - destinationURI.getPath(), - session -> SSHUtils.scpFrom(sourceURI.getPath(), destinationURI.getPath(), session)); + taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI + .getPath()); } status.setReason("Successfully staged data"); } catch (SSHApiException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java index bbe77d8..7de0282 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java @@ -25,7 +25,6 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.SSHUtils; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.status.TaskStatus; @@ -49,8 +48,7 @@ public class EnvironmentSetupTask implements Task { TaskStatus status = new TaskStatus(TaskState.COMPLETED); try { RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster(); - String workingDir = taskContext.getParentProcessContext().getWorkingDir(); - remoteCluster.makeDirectory(workingDir, session -> SSHUtils.makeDirectory(workingDir, session)); + remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir()); status.setReason("Successfully created environment"); } catch (SSHApiException e) { String msg = "Error while environment setup"; http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 4189f81..2788535 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -182,7 +182,7 @@ public class SCPDataStageTask implements Task { errorModel.setUserFriendlyMessage(msg); taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); return status; - } catch (ApplicationSettingsException e) { + } catch (ApplicationSettingsException | FileNotFoundException e) { String msg = "Failed while reading credentials"; log.error(msg, e); status.setState(TaskState.FAILED); @@ -219,7 +219,7 @@ public class SCPDataStageTask implements Task { errorModel.setActualErrorMessage(e.getMessage()); errorModel.setUserFriendlyMessage(msg); taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); - } catch (DataStagingException e) { + } catch (JSchException | IOException e) { String msg = "Failed to do scp with client"; log.error(msg, e); status.setState(TaskState.FAILED); @@ -249,35 +249,39 @@ public class SCPDataStageTask implements Task { } private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI - destinationURI) throws SSHApiException { + destinationURI) throws SSHApiException, IOException, JSchException { /** * scp third party file transfer 'to' compute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer( - sourceURI.getPath(), - destinationURI.getPath(), - session -> SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false)); + taskContext.getParentProcessContext().getDataMovementRemoteCluster() + .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { + try { + SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false); + } catch (Exception e) { + throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() + + " to " + destinationURI.getPath()); + } + }); } private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI) - throws SSHApiException, AiravataException, GFacException { + throws SSHApiException, AiravataException, IOException, JSchException, GFacException { /** * scp third party file transfer 'from' comute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer( - sourceURI.getPath(), - destinationURI.getPath(), - session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true)); + taskContext.getParentProcessContext().getDataMovementRemoteCluster() + .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { + try { + SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true); + } catch (Exception e) { + throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() + + " to " + destinationURI.getPath()); + } + }); // update output locations - GFacUtils.saveExperimentOutput( - taskContext.getParentProcessContext(), - taskContext.getProcessOutput().getName(), - destinationURI.toString()); - GFacUtils.saveProcessOutput( - taskContext.getParentProcessContext(), - taskContext.getProcessOutput().getName(), - destinationURI.toString()); + GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString()); + GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java index f5fd14d..fccce0d 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java @@ -21,8 +21,10 @@ package org.apache.airavata.gfac.impl.task.utils; +import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.credential.store.store.CredentialStoreException; import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; @@ -30,6 +32,7 @@ import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; import org.apache.airavata.gfac.core.cluster.CommandInfo; import org.apache.airavata.gfac.core.cluster.RawCommandInfo; +import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.cluster.ServerInfo; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.impl.Factory; @@ -41,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.TimerTask; @@ -75,23 +79,25 @@ public class StreamData extends TimerTask { } // output staging should end when the job is complete - if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE) - || jobStatus.getJobState().equals(JobState.CANCELED) - || jobStatus.getJobState().equals(JobState.FAILED)){ + if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE) || jobStatus.getJobState().equals(JobState.CANCELED) || jobStatus.getJobState().equals(JobState.FAILED)){ this.cancel(); } } catch (URISyntaxException e) { log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous path specified", taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), taskContext.getProcessOutput().getName()); - } catch (AiravataException | SSHApiException e) { + } catch (IllegalAccessException | InstantiationException | AiravataException | IOException | JSchException | SSHApiException e) { log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while streaming data", taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), taskContext.getProcessOutput().getName()); + } catch (CredentialStoreException e) { + log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while connecting with credential store", + taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), + taskContext.getProcessOutput().getName()); } } - public void runOutputStaging() throws URISyntaxException, AiravataException, SSHApiException { + public void runOutputStaging() throws URISyntaxException, IllegalAccessException, InstantiationException, CredentialStoreException, AiravataException, IOException, JSchException, SSHApiException { try { AuthenticationInfo authenticationInfo = null; @@ -117,7 +123,7 @@ public class StreamData extends TimerTask { String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/')); SSHUtils.makeDirectory(targetPath, sshSession); outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); - } catch (DataStagingException | GFacException e) { + } catch (GFacException e) { log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error while output staging", taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), taskContext.getProcessOutput().getName()); @@ -141,24 +147,23 @@ public class StreamData extends TimerTask { } private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI) - throws SSHApiException, GFacException { + throws SSHApiException, AiravataException, IOException, JSchException, GFacException { /** * scp third party file transfer 'from' comute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer( - sourceURI.getPath(), - destinationURI.getPath(), - session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true)); + taskContext.getParentProcessContext().getDataMovementRemoteCluster() + .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { + try { + SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true); + } catch (Exception e) { + throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() + + " to " + destinationURI.getPath()); + } + }); // update output locations - GFacUtils.saveExperimentOutput( - taskContext.getParentProcessContext(), - taskContext.getProcessOutput().getName(), - destinationURI.getPath()); - GFacUtils.saveProcessOutput( - taskContext.getParentProcessContext(), - taskContext.getProcessOutput().getName(), - destinationURI.getPath()); + GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); + GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); }
