Revert scp thrird party refacotring
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a45294e4 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a45294e4 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a45294e4 Branch: refs/heads/develop Commit: a45294e461cca0ea5b79fccdd75e969a201052e7 Parents: dbcfd77 Author: Shameera Rathnayaka <[email protected]> Authored: Wed Nov 2 17:29:28 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Wed Nov 2 17:29:28 2016 -0400 ---------------------------------------------------------------------- .../gfac/core/DataStagingException.java | 43 -------------------- .../gfac/core/cluster/RemoteCluster.java | 33 +++++++-------- .../gfac/core/cluster/SessionConsumer.java | 28 ------------- .../airavata/gfac/impl/BESRemoteCluster.java | 3 +- .../airavata/gfac/impl/HPCRemoteCluster.java | 18 ++++---- .../airavata/gfac/impl/LocalRemoteCluster.java | 17 ++++---- .../airavata/gfac/impl/task/ArchiveTask.java | 13 ++---- .../gfac/impl/task/SCPDataStageTask.java | 23 ++--------- .../gfac/impl/task/utils/StreamData.java | 12 +----- 9 files changed, 49 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java deleted file mode 100644 index 67ed0c7..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.airavata.gfac.core; - -public class DataStagingException extends Exception { - - public DataStagingException() { - super(); - } - - public DataStagingException(String message) { - super(message); - } - - public DataStagingException(String message, Throwable cause) { - super(message, cause); - } - - public DataStagingException(Throwable cause) { - super(cause); - } - - protected DataStagingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java index 5f8d0ec..3916573 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java @@ -43,7 +43,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @return jobId after successful job submission * @throws SSHApiException throws exception during error */ - JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException; + public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException; /** * This will copy the localFile to remoteFile location in configured cluster @@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file location, this can be a directory too * @throws SSHApiException throws exception during error */ - void copyTo(String localFile, String remoteFile) throws SSHApiException; + public void copyTo(String localFile, String remoteFile) throws SSHApiException; /** * This will copy a remote file in path rFile to local file lFile @@ -60,16 +60,18 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param remoteFile remote file path, this has to be a full qualified path * @param localFile This is the local file to copy, this can be a directory too */ - void copyFrom(String remoteFile, String localFile) throws SSHApiException; + public void copyFrom(String remoteFile, String localFile) throws SSHApiException; /** * This wil copy source remote file to target remote file. * * @param sourceFile remote file path, this has to be a full qualified path * @param destinationFile This is the local file to copy, this can be a directory too + * @param session jcraft session of other coner of thirdparty file transfer. + * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT) * */ - void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException; + public void scpThirdParty(String sourceFile, String destinationFile ,Session session , DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException; /** * This will create directories in computing resources @@ -77,7 +79,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param directoryPath the full qualified path for the directory user wants to create * @throws SSHApiException throws during error */ - void makeDirectory(String directoryPath) throws SSHApiException; + public void makeDirectory(String directoryPath) throws SSHApiException; /** * This will delete the given job from the queue @@ -86,7 +88,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @return return the description of the deleted job * @throws SSHApiException throws exception during error */ - JobStatus cancelJob(String jobID) throws SSHApiException; + public JobStatus cancelJob(String jobID) throws SSHApiException; /** * This will get the job status of the the job associated with this jobId @@ -95,7 +97,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @return job status of the given jobID * @throws SSHApiException throws exception during error */ - JobStatus getJobStatus(String jobID) throws SSHApiException; + public JobStatus getJobStatus(String jobID) throws SSHApiException; /** * This will get the job status of the the job associated with this jobId @@ -104,7 +106,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @return jobId of the given jobName * @throws SSHApiException throws exception during error */ - String getJobIdByJobName(String jobName, String userName) throws SSHApiException; + public String getJobIdByJobName(String jobName, String userName) throws SSHApiException; /** * This method can be used to poll the jobstatuses based on the given @@ -115,7 +117,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param userName userName of the jobs which required to get the status * @param jobIDs precises set of jobIDs */ - void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException; + public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException; /** * This will list directories in computing resources @@ -123,7 +125,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param directoryPath the full qualified path for the directory user wants to create * @throws SSHApiException throws during error */ - List<String> listDirectory(String directoryPath) throws SSHApiException; + public List<String> listDirectory(String directoryPath) throws SSHApiException; /** * This method can use to execute custom command on remote compute resource. @@ -131,27 +133,26 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @return <code>true</code> if command successfully executed, <code>false</code> otherwise. * @throws SSHApiException */ - boolean execute(CommandInfo commandInfo) throws SSHApiException; + public boolean execute(CommandInfo commandInfo) throws SSHApiException; /** * This method can be used to get created ssh session * to reuse the created session. */ - Session getSession() throws SSHApiException; + public Session getSession() throws SSHApiException; /** * This method can be used to close the connections initialized * to handle graceful shutdown of the system */ - void disconnect() throws SSHApiException; + public void disconnect() throws SSHApiException; /** * This gives the server Info */ - ServerInfo getServerInfo(); - - AuthenticationInfo getAuthentication(); + public ServerInfo getServerInfo(); + public AuthenticationInfo getAuthentication(); enum DIRECTION { TO, FROM http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java deleted file mode 100644 index 2f39d97..0000000 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.airavata.gfac.core.cluster; - -import org.apache.airavata.gfac.core.DataStagingException; - -public interface SessionConsumer<S> { - - void consume(S session) throws DataStagingException; - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java index 0f517b5..3586ee8 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java @@ -53,7 +53,8 @@ public class BESRemoteCluster extends AbstractRemoteCluster{ } @Override - public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException { + } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index c3566b8..725b6d0 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -26,7 +26,6 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.jcraft.jsch.UserInfo; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.JobManagerConfiguration; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; @@ -165,7 +164,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ } @Override - public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { + public void scpThirdParty(String sourceFile, String destinationFile, Session clientSession, DIRECTION direction, boolean ignoreEmptyFile) throws SSHApiException { int retryCount= 0; try { while (retryCount < MAX_RETRY_COUNT) { @@ -173,19 +172,24 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ session = Factory.getSSHSession(authenticationInfo, serverInfo); log.info("Transferring from:" + sourceFile + " To: " + destinationFile); try { - sessionConsumer.consume(session); + if (direction == DIRECTION.TO) { + SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile, session, ignoreEmptyFile); + } else { + SSHUtils.scpThirdParty(sourceFile, session, destinationFile, clientSession, ignoreEmptyFile); + } break; // exit while loop - } catch (DataStagingException e) { + } catch (JSchException e) { if (retryCount == MAX_RETRY_COUNT) { log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for transferring from:" + sourceFile + " To: " + destinationFile, e); throw e; } - log.error("Issue with file staging, Retry transferring from:" + sourceFile + " To: " + destinationFile, e); + log.error("Issue with jsch, Retry transferring from:" + sourceFile + " To: " + destinationFile, e); } } - } catch (AiravataException| DataStagingException e) { - throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file " + destinationFile, e); + } catch (IOException | AiravataException| JSchException e) { + throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file " + +destinationFile , e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java index d5422d2..9294470 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java @@ -23,7 +23,6 @@ package org.apache.airavata.gfac.impl; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.JobManagerConfiguration; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; @@ -100,16 +99,20 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { } @Override - public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException { - int retryCount = 0; + public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException { + int retryCount= 0; try { while (retryCount < MAX_RETRY_COUNT) { retryCount++; log.info("Transferring from:" + sourceFile + " To: " + destinationFile); try { - sessionConsumer.consume(null); + if (inOrOut == DIRECTION.TO) { + SSHUtils.scpThirdParty(sourceFile, session, destinationFile, session, ignoreEmptyFile); + } else { + SSHUtils.scpThirdParty(sourceFile, session, destinationFile, session, ignoreEmptyFile); + } break; // exit while loop - } catch (DataStagingException e) { + } catch (JSchException e) { if (retryCount == MAX_RETRY_COUNT) { log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for transferring from:" + sourceFile + " To: " + destinationFile, e); @@ -118,9 +121,9 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { log.error("Issue with jsch, Retry transferring from:" + sourceFile + " To: " + destinationFile, e); } } - } catch (DataStagingException e) { + } catch (IOException | JSchException e) { throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file " - + destinationFile, e); + +destinationFile , e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java index 88661f8..df22654 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java @@ -26,7 +26,6 @@ import com.jcraft.jsch.Session; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; @@ -40,7 +39,6 @@ import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.Factory; -import org.apache.airavata.gfac.impl.SSHUtils; import org.apache.airavata.gfac.impl.StandardOutReader; import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; @@ -121,6 +119,7 @@ public class ArchiveTask implements Task { ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT); Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo); URI sourceURI = new URI(subTaskModel.getSource()); + URI destinationURI = null; String workingDirName = null, path = null; if (sourceURI.getPath().endsWith("/")) { path = sourceURI.getPath().substring(0, sourceURI.getPath().length() - 1); @@ -137,14 +136,8 @@ public class ArchiveTask implements Task { // move tar to storage resource remoteCluster.execute(commandInfo); - URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar); - remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath ,destinationURI.getPath(), session -> { - try { - SSHUtils.scpThirdParty(sourceURI.getPath(),session, destinationURI.getPath(), sshSession, true); - } catch (Exception e) { - throw new DataStagingException("Error while transferring " + sourceURI.getPath() + " to " + destinationURI.getPath()); - } - }); + destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar); + remoteCluster.scpThirdParty(resourceAbsTarFilePath ,destinationURI.getPath() , sshSession, RemoteCluster.DIRECTION.FROM, true); // delete tar in remote computer resource commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath); http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 2788535..6a8800e 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -24,7 +24,6 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.SSHApiException; @@ -253,15 +252,8 @@ public class SCPDataStageTask implements Task { /** * scp third party file transfer 'to' compute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster() - .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { - try { - SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false); - } catch (Exception e) { - throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() - + " to " + destinationURI.getPath()); - } - }); + taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(), + destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, false); } private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI) @@ -270,15 +262,8 @@ public class SCPDataStageTask implements Task { /** * scp third party file transfer 'from' comute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster() - .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { - try { - SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true); - } catch (Exception e) { - throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() - + " to " + destinationURI.getPath()); - } - }); + taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(), + destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, true); // update output locations GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString()); GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString()); http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java index fccce0d..375e570 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java @@ -25,7 +25,6 @@ import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.credential.store.store.CredentialStoreException; -import org.apache.airavata.gfac.core.DataStagingException; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.SSHApiException; @@ -152,15 +151,8 @@ public class StreamData extends TimerTask { /** * scp third party file transfer 'from' comute resource. */ - taskContext.getParentProcessContext().getDataMovementRemoteCluster() - .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> { - try { - SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true); - } catch (Exception e) { - throw new DataStagingException("Error while file staging, from " + sourceURI.getPath() - + " to " + destinationURI.getPath()); - } - }); + taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(), + destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, true); // update output locations GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath()); GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
