Revert scp thrird party refacotring

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a45294e4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a45294e4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a45294e4

Branch: refs/heads/develop
Commit: a45294e461cca0ea5b79fccdd75e969a201052e7
Parents: dbcfd77
Author: Shameera Rathnayaka <[email protected]>
Authored: Wed Nov 2 17:29:28 2016 -0400
Committer: Shameera Rathnayaka <[email protected]>
Committed: Wed Nov 2 17:29:28 2016 -0400

----------------------------------------------------------------------
 .../gfac/core/DataStagingException.java         | 43 --------------------
 .../gfac/core/cluster/RemoteCluster.java        | 33 +++++++--------
 .../gfac/core/cluster/SessionConsumer.java      | 28 -------------
 .../airavata/gfac/impl/BESRemoteCluster.java    |  3 +-
 .../airavata/gfac/impl/HPCRemoteCluster.java    | 18 ++++----
 .../airavata/gfac/impl/LocalRemoteCluster.java  | 17 ++++----
 .../airavata/gfac/impl/task/ArchiveTask.java    | 13 ++----
 .../gfac/impl/task/SCPDataStageTask.java        | 23 ++---------
 .../gfac/impl/task/utils/StreamData.java        | 12 +-----
 9 files changed, 49 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
deleted file mode 100644
index 67ed0c7..0000000
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.core;
-
-public class DataStagingException extends Exception {
-
-    public DataStagingException() {
-        super();
-    }
-
-    public DataStagingException(String message) {
-        super(message);
-    }
-
-    public DataStagingException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public DataStagingException(Throwable cause) {
-        super(cause);
-    }
-
-    protected DataStagingException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 5f8d0ec..3916573 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -43,7 +43,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @return jobId after successful job submission
         * @throws SSHApiException throws exception during error
         */
-       JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String 
workingDirectory) throws SSHApiException;
+       public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, 
String workingDirectory) throws SSHApiException;
 
        /**
         * This will copy the localFile to remoteFile location in configured 
cluster
@@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile remote file location, this can be a directory too
         * @throws SSHApiException throws exception during error
         */
-       void copyTo(String localFile, String remoteFile) throws SSHApiException;
+       public void copyTo(String localFile, String remoteFile) throws 
SSHApiException;
 
        /**
         * This will copy a remote file in path rFile to local file lFile
@@ -60,16 +60,18 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile      remote file path, this has to be a full 
qualified path
         * @param localFile This is the local file to copy, this can be a 
directory too
         */
-       void copyFrom(String remoteFile, String localFile) throws 
SSHApiException;
+       public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException;
 
        /**
         * This wil copy source remote file to target remote file.
         *
         * @param sourceFile remote file path, this has to be a full qualified 
path
         * @param destinationFile This is the local file to copy, this can be a 
directory too
+     * @param session jcraft session of other coner of thirdparty file 
transfer.
+     * @param inOrOut direction to file transfer , to the remote 
cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT)
         *
         */
-       void thirdPartyTransfer(String sourceFile, String destinationFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException;
+       public void scpThirdParty(String sourceFile, String destinationFile 
,Session session , DIRECTION inOrOut, boolean ignoreEmptyFile) throws 
SSHApiException;
 
        /**
         * This will create directories in computing resources
@@ -77,7 +79,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param directoryPath the full qualified path for the directory user 
wants to create
         * @throws SSHApiException throws during error
         */
-       void makeDirectory(String directoryPath) throws SSHApiException;
+       public void makeDirectory(String directoryPath) throws SSHApiException;
 
        /**
         * This will delete the given job from the queue
@@ -86,7 +88,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @return return the description of the deleted job
         * @throws SSHApiException throws exception during error
         */
-       JobStatus cancelJob(String jobID) throws SSHApiException;
+       public JobStatus cancelJob(String jobID) throws SSHApiException;
 
        /**
         * This will get the job status of the the job associated with this 
jobId
@@ -95,7 +97,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @return job status of the given jobID
         * @throws SSHApiException throws exception during error
         */
-       JobStatus getJobStatus(String jobID) throws SSHApiException;
+       public JobStatus getJobStatus(String jobID) throws SSHApiException;
 
        /**
         * This will get the job status of the the job associated with this 
jobId
@@ -104,7 +106,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @return jobId of the given jobName
         * @throws SSHApiException throws exception during error
         */
-       String getJobIdByJobName(String jobName, String userName) throws 
SSHApiException;
+       public String getJobIdByJobName(String jobName, String userName) throws 
SSHApiException;
 
        /**
         * This method can be used to poll the jobstatuses based on the given
@@ -115,7 +117,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param userName userName of the jobs which required to get the status
         * @param jobIDs   precises set of jobIDs
         */
-       void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) 
throws SSHApiException;
+       public void getJobStatuses(String userName, Map<String, JobStatus> 
jobIDs) throws SSHApiException;
 
        /**
         * This will list directories in computing resources
@@ -123,7 +125,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param directoryPath the full qualified path for the directory user 
wants to create
         * @throws SSHApiException throws during error
         */
-       List<String> listDirectory(String directoryPath) throws SSHApiException;
+       public List<String> listDirectory(String directoryPath) throws 
SSHApiException;
 
        /**
         * This method can use to execute custom command on remote compute 
resource.
@@ -131,27 +133,26 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @return <code>true</code> if command successfully executed, 
<code>false</code> otherwise.
         * @throws SSHApiException
         */
-       boolean execute(CommandInfo commandInfo) throws SSHApiException;
+       public boolean execute(CommandInfo commandInfo) throws SSHApiException;
 
        /**
         * This method can be used to get created ssh session
         * to reuse the created session.
         */
-       Session getSession() throws SSHApiException;
+       public Session getSession() throws SSHApiException;
 
        /**
         * This method can be used to close the connections initialized
         * to handle graceful shutdown of the system
         */
-       void disconnect() throws SSHApiException;
+       public void disconnect() throws SSHApiException;
 
        /**
         * This gives the server Info
         */
-       ServerInfo getServerInfo();
-
-    AuthenticationInfo getAuthentication();
+       public ServerInfo getServerInfo();
 
+    public AuthenticationInfo getAuthentication();
     enum DIRECTION {
         TO,
         FROM

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
deleted file mode 100644
index 2f39d97..0000000
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.core.cluster;
-
-import org.apache.airavata.gfac.core.DataStagingException;
-
-public interface SessionConsumer<S> {
-
-    void consume(S session) throws DataStagingException;
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
index 0f517b5..3586ee8 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -53,7 +53,8 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
     }
 
     @Override
-    public void thirdPartyTransfer(String sourceFile, String destinationFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+    public void scpThirdParty(String sourceFile, String destinationFile, 
Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws 
SSHApiException {
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index c3566b8..725b6d0 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -26,7 +26,6 @@ import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.UserInfo;
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -165,7 +164,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        }
 
        @Override
-       public void thirdPartyTransfer(String sourceFile, String 
destinationFile, SessionConsumer<Session> sessionConsumer) throws 
SSHApiException {
+       public void scpThirdParty(String sourceFile, String destinationFile, 
Session clientSession, DIRECTION direction, boolean ignoreEmptyFile) throws 
SSHApiException {
                int retryCount= 0;
                try {
                        while (retryCount < MAX_RETRY_COUNT) {
@@ -173,19 +172,24 @@ public class HPCRemoteCluster extends 
AbstractRemoteCluster{
                                session = 
Factory.getSSHSession(authenticationInfo, serverInfo);
                                log.info("Transferring from:" + sourceFile + " 
To: " + destinationFile);
                                try {
-                                       sessionConsumer.consume(session);
+                                       if (direction == DIRECTION.TO) {
+                        SSHUtils.scpThirdParty(sourceFile, clientSession, 
destinationFile, session, ignoreEmptyFile);
+                    } else {
+                        SSHUtils.scpThirdParty(sourceFile, session, 
destinationFile, clientSession, ignoreEmptyFile);
+                    }
                                        break; // exit while loop
-                               } catch (DataStagingException e) {
+                               } catch (JSchException e) {
                                        if (retryCount == MAX_RETRY_COUNT) {
                                                log.error("Retry count " + 
MAX_RETRY_COUNT + " exceeded for  transferring from:"
                                                                + sourceFile + 
" To: " + destinationFile, e);
                                                throw e;
                                        }
-                                       log.error("Issue with file staging, 
Retry transferring from:" + sourceFile + " To: " + destinationFile, e);
+                                       log.error("Issue with jsch, Retry 
transferring from:" + sourceFile + " To: " + destinationFile, e);
                                }
                        }
-        } catch (AiravataException| DataStagingException e) {
-                       throw new SSHApiException("Failed scp file:" + 
sourceFile + " to remote file " + destinationFile, e);
+        } catch (IOException | AiravataException| JSchException e) {
+                       throw new SSHApiException("Failed scp file:" + 
sourceFile + " to remote file "
+                                       +destinationFile , e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
index d5422d2..9294470 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.impl;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -100,16 +99,20 @@ public class LocalRemoteCluster extends 
AbstractRemoteCluster {
     }
 
     @Override
-    public void thirdPartyTransfer(String sourceFile, String destinationFile, 
SessionConsumer<Session> sessionConsumer) throws SSHApiException {
-        int retryCount = 0;
+    public void scpThirdParty(String sourceFile, String destinationFile, 
Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws 
SSHApiException {
+        int retryCount= 0;
         try {
             while (retryCount < MAX_RETRY_COUNT) {
                 retryCount++;
                 log.info("Transferring from:" + sourceFile + " To: " + 
destinationFile);
                 try {
-                    sessionConsumer.consume(null);
+                    if (inOrOut == DIRECTION.TO) {
+                        SSHUtils.scpThirdParty(sourceFile, session, 
destinationFile, session, ignoreEmptyFile);
+                    } else {
+                        SSHUtils.scpThirdParty(sourceFile, session, 
destinationFile, session, ignoreEmptyFile);
+                    }
                     break; // exit while loop
-                } catch (DataStagingException e) {
+                } catch (JSchException e) {
                     if (retryCount == MAX_RETRY_COUNT) {
                         log.error("Retry count " + MAX_RETRY_COUNT + " 
exceeded for  transferring from:"
                                 + sourceFile + " To: " + destinationFile, e);
@@ -118,9 +121,9 @@ public class LocalRemoteCluster extends 
AbstractRemoteCluster {
                     log.error("Issue with jsch, Retry transferring from:" + 
sourceFile + " To: " + destinationFile, e);
                 }
             }
-        } catch (DataStagingException e) {
+        } catch (IOException | JSchException e) {
             throw new SSHApiException("Failed scp file:" + sourceFile + " to 
remote file "
-                    + destinationFile, e);
+                    +destinationFile , e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index 88661f8..df22654 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -26,7 +26,6 @@ import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -40,7 +39,6 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.gfac.impl.StandardOutReader;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
 import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
@@ -121,6 +119,7 @@ public class ArchiveTask implements Task {
             ServerInfo serverInfo = new ServerInfo(userName, hostName, 
DEFAULT_SSH_PORT);
             Session sshSession = Factory.getSSHSession(authenticationInfo, 
serverInfo);
             URI sourceURI = new URI(subTaskModel.getSource());
+            URI destinationURI = null;
             String workingDirName = null, path = null;
             if (sourceURI.getPath().endsWith("/")) {
                 path = sourceURI.getPath().substring(0, 
sourceURI.getPath().length() - 1);
@@ -137,14 +136,8 @@ public class ArchiveTask implements Task {
 
             // move tar to storage resource
             remoteCluster.execute(commandInfo);
-            URI destinationURI = TaskUtils.getDestinationURI(taskContext, 
hostName, inputPath, archiveTar);
-            remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath 
,destinationURI.getPath(), session -> {
-                try {
-                    SSHUtils.scpThirdParty(sourceURI.getPath(),session, 
destinationURI.getPath(), sshSession, true);
-                } catch (Exception e) {
-                    throw new DataStagingException("Error while transferring " 
+ sourceURI.getPath() + " to " + destinationURI.getPath());
-                }
-            });
+            destinationURI = TaskUtils.getDestinationURI(taskContext, 
hostName, inputPath, archiveTar);
+            remoteCluster.scpThirdParty(resourceAbsTarFilePath 
,destinationURI.getPath() , sshSession, RemoteCluster.DIRECTION.FROM, true);
 
             // delete tar in remote computer resource
             commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 2788535..6a8800e 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -24,7 +24,6 @@ import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -253,15 +252,8 @@ public class SCPDataStageTask implements Task {
         /**
          * scp third party file transfer 'to' compute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
-                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
-                    try {
-                        SSHUtils.scpThirdParty(sourceURI.getPath(), 
sshSession, destinationURI.getPath(), session, false);
-                    } catch (Exception e) {
-                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
-                                + " to " + destinationURI.getPath());
-                    }
-                });
+        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.TO, false);
     }
 
     private void outputDataStaging(TaskContext taskContext, Session 
sshSession, URI sourceURI, URI destinationURI)
@@ -270,15 +262,8 @@ public class SCPDataStageTask implements Task {
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
-                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
-                    try {
-                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, 
destinationURI.getPath(), sshSession, true);
-                    } catch (Exception e) {
-                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
-                                + " to " + destinationURI.getPath());
-                    }
-                });
+        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.FROM, true);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.toString());
         GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.toString());

http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index fccce0d..375e570 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -25,7 +25,6 @@ import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.DataStagingException;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -152,15 +151,8 @@ public class StreamData extends TimerTask  {
         /**
          * scp third party file transfer 'from' comute resource.
          */
-        taskContext.getParentProcessContext().getDataMovementRemoteCluster()
-                .thirdPartyTransfer(sourceURI.getPath(), 
destinationURI.getPath(), session -> {
-                    try {
-                        SSHUtils.scpThirdParty(sourceURI.getPath(), session, 
destinationURI.getPath(), sshSession, true);
-                    } catch (Exception e) {
-                        throw new DataStagingException("Error while file 
staging, from " + sourceURI.getPath()
-                                + " to " + destinationURI.getPath());
-                    }
-                });
+        
taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.FROM, true);
         // update output locations
         GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
         GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());

Reply via email to