Repository: airavata
Updated Branches:
  refs/heads/develop db027a53e -> fe6ebe9c0


Reverted scp refactoring


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dbcfd77b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dbcfd77b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dbcfd77b

Branch: refs/heads/develop
Commit: dbcfd77be3abd26f97a185f65649cd3ea1ac46c0
Parents: 4a92da4
Author: Shameera Rathnayaka <[email protected]>
Authored: Wed Nov 2 17:24:27 2016 -0400
Committer: Shameera Rathnayaka <[email protected]>
Committed: Wed Nov 2 17:24:27 2016 -0400

----------------------------------------------------------------------
 .../gfac/core/cluster/RemoteCluster.java        |   6 +-
 .../airavata/gfac/impl/BESRemoteCluster.java    |   6 +-
 .../airavata/gfac/impl/HPCRemoteCluster.java    |  19 +-
 .../airavata/gfac/impl/LocalRemoteCluster.java  |   9 +-
 .../org/apache/airavata/gfac/impl/SSHUtils.java | 198 +++++++++----------
 .../airavata/gfac/impl/task/ArchiveTask.java    |   9 +-
 .../gfac/impl/task/BESJobSubmissionTask.java    |   5 +-
 .../airavata/gfac/impl/task/DataStageTask.java  |  13 +-
 .../gfac/impl/task/EnvironmentSetupTask.java    |   4 +-
 .../gfac/impl/task/SCPDataStageTask.java        |  44 +++--
 .../gfac/impl/task/utils/StreamData.java        |  43 ++--
 11 files changed, 178 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index f59a9e3..5f8d0ec 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile remote file location, this can be a directory too
         * @throws SSHApiException throws exception during error
         */
-       void copyTo(String localFile, String remoteFile, 
SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException;
+       void copyTo(String localFile, String remoteFile) throws SSHApiException;
 
        /**
         * This will copy a remote file in path rFile to local file lFile
@@ -60,7 +60,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile      remote file path, this has to be a full 
qualified path
         * @param localFile This is the local file to copy, this can be a 
directory too
         */
-       void copyFrom(String remoteFile, String localFile, 
SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException;
+       void copyFrom(String remoteFile, String localFile) throws 
SSHApiException;
 
        /**
         * This wil copy source remote file to target remote file.
@@ -77,7 +77,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param directoryPath the full qualified path for the directory user 
wants to create
         * @throws SSHApiException throws during error
         */
-       void makeDirectory(String directoryPath, SessionConsumer<Session> 
sessionConsumer) throws SSHApiException;
+       void makeDirectory(String directoryPath) throws SSHApiException;
 
        /**
         * This will delete the given job from the queue

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
index 828a34e..0f517b5 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -43,12 +43,12 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
     }
 
     @Override
-    public void copyTo(String localFile, String remoteFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+    public void copyTo(String localFile, String remoteFile) throws 
SSHApiException {
 
     }
 
     @Override
-    public void copyFrom(String remoteFile, String localFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+    public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException {
 
     }
 
@@ -57,7 +57,7 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
     }
 
     @Override
-    public void makeDirectory(String directoryPath, SessionConsumer<Session> 
sessionConsumer) throws SSHApiException {
+    public void makeDirectory(String directoryPath) throws SSHApiException {
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index b322cef..c3566b8 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -86,8 +86,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        @Override
        public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, 
String workingDirectory) throws SSHApiException {
                JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-               copyTo(jobScriptFilePath, workingDirectory,
-                               session -> SSHUtils.scpTo(jobScriptFilePath, 
workingDirectory, session)); // scp script file to working directory
+               copyTo(jobScriptFilePath, workingDirectory); // scp script file 
to working directory
                RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
                submitCommand.setRawCommand("cd " + workingDirectory + "; " + 
submitCommand.getRawCommand());
                StandardOutReader reader = new StandardOutReader();
@@ -113,13 +112,13 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
        }
 
        @Override
-       public void copyTo(String localFile, String remoteFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+       public void copyTo(String localFile, String remoteFile) throws 
SSHApiException {
                int retry = 3;
                while (retry > 0) {
                        try {
                                session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring localhost:" + localFile  
+ " to " + serverInfo.getHost() + ":" + remoteFile);
-                sessionConsumer.consume(session);
+                               SSHUtils.scpTo(localFile, remoteFile, session);
                                retry = 0;
                        } catch (Exception e) {
                                retry--;
@@ -140,13 +139,13 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
        }
 
        @Override
-       public void copyFrom(String remoteFile, String localFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+       public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException {
                int retry = 3;
                while(retry>0) {
                        try {
                                session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring " + serverInfo.getHost() 
+ ":" + remoteFile + " To localhost:" + localFile);
-                               sessionConsumer.consume(session);
+                               SSHUtils.scpFrom(remoteFile, localFile, 
session);
                                retry=0;
                        } catch (Exception e) {
                                retry--;
@@ -191,7 +190,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        }
 
        @Override
-       public void makeDirectory(String directoryPath, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+       public void makeDirectory(String directoryPath) throws SSHApiException {
                int retryCount = 0;
                try {
                        while (retryCount < MAX_RETRY_COUNT) {
@@ -199,9 +198,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
                                session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Creating directory: " + 
serverInfo.getHost() + ":" + directoryPath);
                                try {
-                                       sessionConsumer.consume(session);
+                                       SSHUtils.makeDirectory(directoryPath, 
session);
                                        break;  // Exit while loop
-                               } catch (DataStagingException e) {
+                               } catch (JSchException e) {
                                        if (retryCount == MAX_RETRY_COUNT) {
                                                log.error("Retry count " + 
MAX_RETRY_COUNT + " exceeded for creating directory: "
                                                                + 
serverInfo.getHost() + ":" + directoryPath, e);
@@ -211,7 +210,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
                                        log.error("Issue with jsch, Retry 
creating directory: " + serverInfo.getHost() + ":" + directoryPath);
                                }
                        }
-               } catch (AiravataException | DataStagingException e) {
+               } catch (JSchException | AiravataException | IOException e) {
                        throw new SSHApiException("Failed to create directory " 
+ serverInfo.getHost() + ":" + directoryPath, e);
                }
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
index aa79a0c..d5422d2 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -60,8 +60,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster 
{
     public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String 
workingDirectory) throws SSHApiException {
         try {
             JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-            String remoteFile = workingDirectory + File.separator + new 
File(jobScriptFilePath).getName();
-            copyTo(jobScriptFilePath, remoteFile, session -> 
SSHUtils.scpTo(jobScriptFilePath, remoteFile, session)); // scp script file to 
working directory
+            copyTo(jobScriptFilePath, workingDirectory + File.separator + new 
File(jobScriptFilePath).getName()); // scp script file to working directory
             RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
             submitCommand.setRawCommand(submitCommand.getRawCommand());
             LocalCommandOutput localCommandOutput = new LocalCommandOutput();
@@ -77,7 +76,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster 
{
     }
 
     @Override
-    public void copyTo(String localFile, String remoteFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+    public void copyTo(String localFile, String remoteFile) throws 
SSHApiException {
         Path sourcePath = Paths.get(localFile);
         Path targetPath = Paths.get(remoteFile);
         try {
@@ -89,7 +88,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster 
{
     }
 
     @Override
-    public void copyFrom(String remoteFile, String localFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+    public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException {
         Path sourcePath = Paths.get(remoteFile);
         Path targetPath = Paths.get(localFile);
         try {
@@ -126,7 +125,7 @@ public class LocalRemoteCluster extends 
AbstractRemoteCluster {
     }
 
     @Override
-    public void makeDirectory(String directoryPath, SessionConsumer<Session> 
sessionConsumer) throws SSHApiException {
+    public void makeDirectory(String directoryPath) throws SSHApiException {
         Path dirPath = Paths.get(directoryPath);
         Set<PosixFilePermission> perms = new HashSet<>();
         // add permission as rwxr--r-- 744

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 07e1799..cd5651e 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -24,7 +24,6 @@ import com.jcraft.jsch.Channel;
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,100 +53,97 @@ public class SSHUtils {
         * @param localFile  Local file to transfer, this can be a directory
         * @return returns the final remote file path, so that users can use 
the new file location
         */
-       public static String scpTo(String localFile, String remoteFile, Session 
session) throws DataStagingException {
+       public static String scpTo(String localFile, String remoteFile, Session 
session) throws IOException,
+                       JSchException, SSHApiException {
                FileInputStream fis = null;
                String prefix = null;
                if (new File(localFile).isDirectory()) {
                        prefix = localFile + File.separator;
                }
                boolean ptimestamp = true;
-               Channel channel = null;
+
                // exec 'scp -t rfile' remotely
                String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + 
remoteFile;
-               try {
-                       channel = session.openChannel("exec");
+               Channel channel = session.openChannel("exec");
 
-                       StandardOutReader stdOutReader = new 
StandardOutReader();
-                       ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
-                       ((ChannelExec) channel).setCommand(command);
+               StandardOutReader stdOutReader = new StandardOutReader();
+               ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
+               ((ChannelExec) channel).setCommand(command);
 
-                       // get I/O streams for remote scp
-                       try (OutputStream out = channel.getOutputStream();
-                                InputStream in = channel.getInputStream()) {
+               // get I/O streams for remote scp
+               OutputStream out = channel.getOutputStream();
+               InputStream in = channel.getInputStream();
 
-                               channel.connect();
-                               if (checkAck(in) != 0) {
-                                       String error = "Error Reading input 
Stream";
-                                       log.error(error);
-                                       throw new DataStagingException(error);
-                               }
+               channel.connect();
 
-                               File _lfile = new File(localFile);
-
-                               if (ptimestamp) {
-                                       command = "T" + (_lfile.lastModified() 
/ 1000) + " 0";
-                                       // The access time should be sent here,
-                                       // but it is not accessible with 
JavaAPI ;-<
-                                       command += (" " + 
(_lfile.lastModified() / 1000) + " 0\n");
-                                       out.write(command.getBytes());
-                                       out.flush();
-                                       if (checkAck(in) != 0) {
-                                               String error = "Error Reading 
input Stream";
-                                               log.error(error);
-                                               throw new 
DataStagingException(error);
-                                       }
-                               }
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
 
-                               // send "C0644 filesize filename", where 
filename should not include '/'
-                               long filesize = _lfile.length();
-                               command = "C0644 " + filesize + " ";
-                               if (localFile.lastIndexOf('/') > 0) {
-                                       command += 
localFile.substring(localFile.lastIndexOf('/') + 1);
-                               } else {
-                                       command += localFile;
-                               }
-                               command += "\n";
-                               out.write(command.getBytes());
-                               out.flush();
-                               if (checkAck(in) != 0) {
-                                       String error = "Error Reading input 
Stream";
-                                       log.error(error);
-                                       throw new DataStagingException(error);
-                               }
+               File _lfile = new File(localFile);
 
-                               // send a content of localFile
-                               fis = new FileInputStream(localFile);
-                               byte[] buf = new byte[1024];
-                               while (true) {
-                                       int len = fis.read(buf, 0, buf.length);
-                                       if (len <= 0) break;
-                                       out.write(buf, 0, len); //out.flush();
-                               }
-                               fis.close();
-                               fis = null;
-                               // send '\0'
-                               buf[0] = 0;
-                               out.write(buf, 0, 1);
-                               out.flush();
-                               if (checkAck(in) != 0) {
-                                       String error = "Error Reading input 
Stream";
-                                       log.error(error);
-                                       throw new DataStagingException(error);
-                               }
-                       }
-                       stdOutReader.onOutput(channel);
-                       if (stdOutReader.getStdErrorString().contains("scp:")) {
-                               throw new 
DataStagingException(stdOutReader.getStdErrorString());
-                       }
-                       //since remote file is always a file  we just return 
the file
-                       return remoteFile;
-               } catch (IOException | JSchException e) {
-                       throw new DataStagingException(e);
-               } finally {
-                       if (channel != null && channel.isConnected()) {
-                               channel.disconnect();
+               if (ptimestamp) {
+                       command = "T" + (_lfile.lastModified() / 1000) + " 0";
+                       // The access time should be sent here,
+                       // but it is not accessible with JavaAPI ;-<
+                       command += (" " + (_lfile.lastModified() / 1000) + " 
0\n");
+                       out.write(command.getBytes());
+                       out.flush();
+                       if (checkAck(in) != 0) {
+                               String error = "Error Reading input Stream";
+                               log.error(error);
+                               throw new SSHApiException(error);
                        }
                }
+
+               // send "C0644 filesize filename", where filename should not 
include '/'
+               long filesize = _lfile.length();
+               command = "C0644 " + filesize + " ";
+               if (localFile.lastIndexOf('/') > 0) {
+                       command += 
localFile.substring(localFile.lastIndexOf('/') + 1);
+               } else {
+                       command += localFile;
+               }
+               command += "\n";
+               out.write(command.getBytes());
+               out.flush();
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
+
+               // send a content of localFile
+               fis = new FileInputStream(localFile);
+               byte[] buf = new byte[1024];
+               while (true) {
+                       int len = fis.read(buf, 0, buf.length);
+                       if (len <= 0) break;
+                       out.write(buf, 0, len); //out.flush();
+               }
+               fis.close();
+               fis = null;
+               // send '\0'
+               buf[0] = 0;
+               out.write(buf, 0, 1);
+               out.flush();
+               if (checkAck(in) != 0) {
+                       String error = "Error Reading input Stream";
+                       log.error(error);
+                       throw new SSHApiException(error);
+               }
+               out.close();
+               stdOutReader.onOutput(channel);
+
+
+               channel.disconnect();
+               if (stdOutReader.getStdErrorString().contains("scp:")) {
+                       throw new 
SSHApiException(stdOutReader.getStdErrorString());
+               }
+               //since remote file is always a file  we just return the file
+               return remoteFile;
        }
 
        /**
@@ -157,7 +153,8 @@ public class SSHUtils {
         * @param localFile  This is the local file to copy, this can be a 
directory too
         * @return returns the final local file path of the new file came from 
the remote resource
         */
-       public static void scpFrom(String remoteFile, String localFile, Session 
session) throws DataStagingException {
+       public static void scpFrom(String remoteFile, String localFile, Session 
session) throws IOException,
+                       JSchException, SSHApiException {
                FileOutputStream fos = null;
                try {
                        String prefix = null;
@@ -258,7 +255,6 @@ public class SSHUtils {
 
                } catch (Exception e) {
                        log.error(e.getMessage(), e);
-                       throw new DataStagingException(e);
                } finally {
                        try {
                                if (fos != null) fos.close();
@@ -276,11 +272,8 @@ public class SSHUtils {
      * @param destinationSession JSch Session for target
      * @return returns the final local file path of the new file came from the 
remote resource
      */
-    public static void scpThirdParty(String sourceFile,
-                                                                        
Session sourceSession,
-                                                                        String 
destinationFile,
-                                                                        
Session destinationSession,
-                                                                        
boolean ignoreEmptyFile) throws DataStagingException {
+    public static void scpThirdParty(String sourceFile, Session sourceSession, 
String destinationFile, Session destinationSession, boolean ignoreEmptyFile) 
throws
+            IOException, JSchException {
         OutputStream sout = null;
         InputStream sin = null;
         OutputStream dout = null;
@@ -408,7 +401,7 @@ public class SSHUtils {
 
         } catch (Exception e) {
             log.error(e.getMessage(), e);
-            throw new DataStagingException(e.getMessage());
+            throw new JSchException(e.getMessage());
         } finally {
             try {
                 if (dout != null) dout.close();
@@ -433,32 +426,35 @@ public class SSHUtils {
         }
     }
 
-       public static void makeDirectory(String path, Session session) throws 
DataStagingException {
+       public static void makeDirectory(String path, Session session) throws 
IOException, JSchException, SSHApiException {
 
-               Channel channel = null;
+               // exec 'scp -t rfile' remotely
                String command = "mkdir -p " + path;
+               Channel channel = session.openChannel("exec");
+               StandardOutReader stdOutReader = new StandardOutReader();
+
+               ((ChannelExec) channel).setCommand(command);
+
+
+               ((ChannelExec) 
channel).setErrStream(stdOutReader.getStandardError());
                try {
-                       // exec 'scp -t rfile' remotely
-                       channel = session.openChannel("exec");
-                       StandardOutReader stdOutReader = new 
StandardOutReader();
                        channel.connect();
-                       stdOutReader.onOutput(channel);
-                       if 
(stdOutReader.getStdErrorString().contains("mkdir:")) {
-                               throw new 
DataStagingException(stdOutReader.getStdErrorString());
-                       }
-
                } catch (JSchException e) {
 
+                       channel.disconnect();
 //            session.disconnect();
                        log.error("Unable to retrieve command output. Command - 
" + command +
                                        " on server - " + session.getHost() + 
":" + session.getPort() +
                                        " connecting user name - "
                                        + session.getUserName());
-                       throw new DataStagingException(e);
-               }finally {
-                       if(channel != null && channel.isConnected())
-                               channel.disconnect();
+                       throw e;
+               }
+               stdOutReader.onOutput(channel);
+               if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+                       throw new 
SSHApiException(stdOutReader.getStdErrorString());
                }
+
+               channel.disconnect();
        }
 
        public static List<String> listDirectory(String path, Session session) 
throws IOException, JSchException,

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index 85d3fd2..88661f8 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -138,8 +138,13 @@ public class ArchiveTask implements Task {
             // move tar to storage resource
             remoteCluster.execute(commandInfo);
             URI destinationURI = TaskUtils.getDestinationURI(taskContext, 
hostName, inputPath, archiveTar);
-            remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath, 
destinationURI.getPath(),
-                    session -> SSHUtils.scpThirdParty(sourceURI.getPath(), 
session, destinationURI.getPath(), sshSession, true));
+            remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath 
,destinationURI.getPath(), session -> {
+                try {
+                    SSHUtils.scpThirdParty(sourceURI.getPath(),session, 
destinationURI.getPath(), sshSession, true);
+                } catch (Exception e) {
+                    throw new DataStagingException("Error while transferring " 
+ sourceURI.getPath() + " to " + destinationURI.getPath());
+                }
+            });
 
             // delete tar in remote computer resource
             commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index 1811286..990b9ea 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -31,7 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
 import eu.unicore.util.httpclient.DefaultClientConfiguration;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -274,7 +273,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
                         break;
                 }
             }
-        } catch (DataStagingException | AiravataException | URISyntaxException 
e) {
+        } catch (IOException | JSchException | AiravataException | 
SSHApiException | URISyntaxException e) {
             log.error("Error while coping local file " + localFilePath + " to 
remote " + remoteFilePath, e);
             throw new GFacException("Error while scp output files to remote 
storage file location", e);
         }
@@ -318,7 +317,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
                     input.setValue("file:/" + localFilePath);
                 }
             }
-        } catch ( AiravataException | DataStagingException| URISyntaxException 
e) {
+        } catch (IOException | JSchException | AiravataException | 
SSHApiException | URISyntaxException e) {
             log.error("Error while coping remote file " + remoteFilePath + " 
to local " + localFilePath, e);
             throw new GFacException("Error while scp input files to local file 
location", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
index a171f7f..8c6a125 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -24,7 +24,6 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
@@ -66,18 +65,14 @@ public class DataStageTask implements Task {
                                        /**
                                         * copy local file to compute resource.
                                         */
-                                       
taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(
-                                                       sourceURI.getPath(),
-                                                       
destinationURI.getPath(),
-                                                       session -> 
SSHUtils.scpTo(sourceURI.getPath(), destinationURI.getPath(), session));
+                                       
taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
                                } else if (processState == 
ProcessState.OUTPUT_DATA_STAGING) {
                                        /**
                                         * copy remote file from compute 
resource.
                                         */
-                                       
taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(
-                                                       sourceURI.getPath(),
-                                                       
destinationURI.getPath(),
-                                                       session -> 
SSHUtils.scpFrom(sourceURI.getPath(), destinationURI.getPath(), session));
+                                       
taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
                                }
                                status.setReason("Successfully staged data");
                        } catch (SSHApiException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
index bbe77d8..7de0282 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -25,7 +25,6 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
@@ -49,8 +48,7 @@ public class EnvironmentSetupTask implements Task {
                TaskStatus status = new TaskStatus(TaskState.COMPLETED);
                try {
                        RemoteCluster remoteCluster = 
taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
-                       String workingDir = 
taskContext.getParentProcessContext().getWorkingDir();
-                       remoteCluster.makeDirectory(workingDir, session -> 
SSHUtils.makeDirectory(workingDir, session));
+                       
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
                        status.setReason("Successfully created environment");
                } catch (SSHApiException e) {
                        String msg = "Error while environment setup";

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 4189f81..2788535 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -182,7 +182,7 @@ public class SCPDataStageTask implements Task {
             errorModel.setUserFriendlyMessage(msg);
             
taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
             return status;
-        } catch (ApplicationSettingsException e) {
+        } catch (ApplicationSettingsException | FileNotFoundException e) {
             String msg = "Failed while reading credentials";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -219,7 +219,7 @@ public class SCPDataStageTask implements Task {
             errorModel.setActualErrorMessage(e.getMessage());
             errorModel.setUserFriendlyMessage(msg);
             
taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
-        } catch (DataStagingException e) {
+        } catch (JSchException | IOException e) {
             String msg = "Failed to do scp with client";
             log.error(msg, e);
             status.setState(TaskState.FAILED);
@@ -249,35 +249,39 @@ public class SCPDataStageTask implements Task {
     }
 
     private void inputDataStaging(TaskContext taskContext, Session sshSession, 
URI sourceURI, URI
-            destinationURI) throws SSHApiException {
+            destinationURI) throws SSHApiException, IOException, JSchException 
{
         /**
          * scp third party file transfer 'to' compute resource.
          */
-        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), 
sshSession, destinationURI.getPath(), session, false));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), 
sshSession, destinationURI.getPath(), session, false);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
     }
 
     private void outputDataStaging(TaskContext taskContext, Session 
sshSession, URI sourceURI, URI destinationURI)
-            throws SSHApiException, AiravataException, GFacException {
+            throws SSHApiException, AiravataException, IOException, 
JSchException, GFacException {
 
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), 
session, destinationURI.getPath(), sshSession, true));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, 
destinationURI.getPath(), sshSession, true);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
         // update output locations
-        GFacUtils.saveExperimentOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.toString());
-        GFacUtils.saveProcessOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.toString());
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.toString());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.toString());
 
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index f5fd14d..fccce0d 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -21,8 +21,10 @@
 
 package org.apache.airavata.gfac.impl.task.utils;
 
+import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
 import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -30,6 +32,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.cluster.CommandInfo;
 import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.impl.Factory;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.TimerTask;
@@ -75,23 +79,25 @@ public class StreamData extends TimerTask  {
             }
 
             // output staging should end when the job is complete
-            if (jobStatus != null && 
jobStatus.getJobState().equals(JobState.COMPLETE)
-                    || jobStatus.getJobState().equals(JobState.CANCELED)
-                    || jobStatus.getJobState().equals(JobState.FAILED)){
+            if (jobStatus != null && 
jobStatus.getJobState().equals(JobState.COMPLETE) || 
jobStatus.getJobState().equals(JobState.CANCELED) || 
jobStatus.getJobState().equals(JobState.FAILED)){
                 this.cancel();
             }
         } catch (URISyntaxException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage 
file {} , Erroneous path specified",
                     taskContext.getExperimentId(), taskContext.getProcessId(), 
taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
-        } catch (AiravataException | SSHApiException e) {
+        } catch (IllegalAccessException | InstantiationException | 
AiravataException | IOException | JSchException | SSHApiException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage 
file {} , Error occurred while streaming data",
                     taskContext.getExperimentId(), taskContext.getProcessId(), 
taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
+        } catch (CredentialStoreException e) {
+            log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage 
file {} , Error occurred while connecting with credential store",
+                    taskContext.getExperimentId(), taskContext.getProcessId(), 
taskContext.getTaskId(),
+                    taskContext.getProcessOutput().getName());
         }
     }
 
-    public void runOutputStaging() throws URISyntaxException, 
AiravataException,  SSHApiException {
+    public void runOutputStaging() throws URISyntaxException, 
IllegalAccessException, InstantiationException, CredentialStoreException, 
AiravataException, IOException, JSchException, SSHApiException {
         try {
 
             AuthenticationInfo authenticationInfo = null;
@@ -117,7 +123,7 @@ public class StreamData extends TimerTask  {
             String targetPath = destinationURI.getPath().substring(0, 
destinationURI.getPath().lastIndexOf('/'));
             SSHUtils.makeDirectory(targetPath, sshSession);
             outputDataStaging(taskContext, sshSession, sourceURI, 
destinationURI);
-        } catch (DataStagingException | GFacException e) {
+        } catch (GFacException e) {
             log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage 
file {} , Error while output staging",
                     taskContext.getExperimentId(), taskContext.getProcessId(), 
taskContext.getTaskId(),
                     taskContext.getProcessOutput().getName());
@@ -141,24 +147,23 @@ public class StreamData extends TimerTask  {
     }
 
     private void outputDataStaging(TaskContext taskContext, Session 
sshSession, URI sourceURI, URI destinationURI)
-            throws SSHApiException, GFacException {
+            throws SSHApiException, AiravataException, IOException, 
JSchException, GFacException {
 
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
-                sourceURI.getPath(),
-                destinationURI.getPath(),
-                session -> SSHUtils.scpThirdParty(sourceURI.getPath(), 
session, destinationURI.getPath(), sshSession, true));
+        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
+                    try {
+                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, 
destinationURI.getPath(), sshSession, true);
+                    } catch (Exception e) {
+                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
+                                + " to " + destinationURI.getPath());
+                    }
+                });
         // update output locations
-        GFacUtils.saveExperimentOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.getPath());
-        GFacUtils.saveProcessOutput(
-                taskContext.getParentProcessContext(),
-                taskContext.getProcessOutput().getName(),
-                destinationURI.getPath());
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
 
     }
 

Reply via email to