handling multiple file transfers for wildcards
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/aadaa258 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/aadaa258 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/aadaa258 Branch: refs/heads/keycloak-prod-migration Commit: aadaa258220a322a5ce35ab5bd381428c27529f7 Parents: 67e8b52 Author: scnakandala <[email protected]> Authored: Fri Jun 30 12:22:41 2017 -0400 Committer: scnakandala <[email protected]> Committed: Fri Jun 30 12:22:41 2017 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 6 +++ .../gfac/core/cluster/RemoteCluster.java | 2 +- .../airavata/gfac/impl/BESRemoteCluster.java | 2 +- .../airavata/gfac/impl/HPCRemoteCluster.java | 10 ++-- .../airavata/gfac/impl/LocalRemoteCluster.java | 2 +- .../gfac/impl/task/SCPDataStageTask.java | 52 +++++++++++++------- 6 files changed, 49 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 1bb9e6f..37cbea3 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -71,6 +71,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.file.Paths; import java.security.SecureRandom; import java.util.*; import java.util.regex.Matcher; @@ -966,6 +967,11 @@ public class GFacUtils { ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog(); String productUri = replicaCatalog.registerDataProduct(dataProductModel); + + if(expOutput.getName().contains("*")){ + String actualFileName = Paths.get(outputVal).getFileName().toString(); + expOutput.setName(actualFileName); + } expOutput.setValue(productUri); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java index ebf5f6b..e18b229 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java @@ -86,7 +86,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable * @param session * @return */ - public String getFileNameFromExtension(String fileExtension, String parentPath, Session session) throws GFacException; + public List<String> getFileNameFromExtension(String fileExtension, String parentPath, Session session) throws GFacException; /** * This will create directories in computing resources http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java index 4f22c6a..0cff161 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java @@ -68,7 +68,7 @@ public class BESRemoteCluster extends AbstractRemoteCluster{ * @return */ @Override - public String getFileNameFromExtension(String fileExtension, String parentPath, Session session) { + public List<String> getFileNameFromExtension(String fileExtension, String parentPath, Session session) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java index 3d972b0..c638429 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -201,16 +202,17 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{ * @return */ @Override - public String getFileNameFromExtension(String fileExtension, String parentPath, Session session) throws GFacException { + public List<String> getFileNameFromExtension(String fileExtension, String parentPath, Session session) throws GFacException { try { List<String> fileNames = SSHUtils.listDirectory(parentPath, session); + List<String> matchingNames = new ArrayList<>(); for(String fileName : fileNames){ - if(fileName.endsWith(fileExtension)){ - return fileName; + if(fileName.matches(fileExtension)){ + matchingNames.add(fileName); } } log.warn("No matching file found for extension: " + fileExtension + " in the " + parentPath + " directory"); - return null; + return matchingNames; } catch (Exception e) { e.printStackTrace(); throw new GFacException("Failed to list directory " + parentPath); http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java index 64a96db..1559912 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java @@ -139,7 +139,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster { * @return */ @Override - public String getFileNameFromExtension(String fileExtension, String parentPath, Session session) { + public List<String> getFileNameFromExtension(String fileExtension, String parentPath, Session session) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/aadaa258/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 745315d..dd7acaf 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -157,30 +158,45 @@ public class SCPDataStageTask implements Task { status = new TaskStatus(TaskState.COMPLETED); //Wildcard for file name. Has to find the correct name. - if(fileName.startsWith("*.")){ + if(fileName.contains("*")){ String destParentPath = (new File(destinationURI.getPath())).getParentFile().getPath(); String sourceParentPath = (new File(sourceURI.getPath())).getParentFile().getPath(); - String temp = taskContext.getParentProcessContext().getDataMovementRemoteCluster() - .getFileNameFromExtension(fileName.substring(2), sourceParentPath, sshSession); - if(temp != null && temp != ""){ - fileName = temp; + List<String> fileNames = taskContext.getParentProcessContext().getDataMovementRemoteCluster() + .getFileNameFromExtension(fileName, sourceParentPath, sshSession); + for(int i=0; i<fileNames.size(); i++){ + String temp = fileNames.get(i); + if(temp != null && temp != ""){ + fileName = temp; + } + if(destParentPath.endsWith(File.separator)){ + destinationURI = new URI(destParentPath + fileName); + }else{ + destinationURI = new URI(destParentPath + File.separator + fileName); + } + + //Wildcard support is only enabled for output data staging + if (processState == ProcessState.OUTPUT_DATA_STAGING) { + makeDir(taskContext, destinationURI); + // TODO - save updated subtask model with new destination + outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); + status.setReason("Successfully staged output data"); + } } - if(destParentPath.endsWith(File.separator)){ - destinationURI = new URI(destParentPath + fileName); + if (processState == ProcessState.OUTPUT_DATA_STAGING) { + status.setReason("Successfully staged output data"); }else{ - destinationURI = new URI(destParentPath + File.separator + fileName); + status.setReason("Wildcard support is only enabled for output data staging"); } - - } - - if (processState == ProcessState.INPUT_DATA_STAGING) { + }else { + if (processState == ProcessState.INPUT_DATA_STAGING) { inputDataStaging(taskContext, sshSession, sourceURI, destinationURI); - status.setReason("Successfully staged input data"); - } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { - makeDir(taskContext, destinationURI); - // TODO - save updated subtask model with new destination - outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); - status.setReason("Successfully staged output data"); + status.setReason("Successfully staged input data"); + } else if (processState == ProcessState.OUTPUT_DATA_STAGING) { + makeDir(taskContext, destinationURI); + // TODO - save updated subtask model with new destination + outputDataStaging(taskContext, sshSession, sourceURI, destinationURI); + status.setReason("Successfully staged output data"); + } } } catch (TException e) { String msg = "Couldn't create subTask model thrift model";
