Allow usee to send the file URL to move the files. AIRAVATA-1419 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6c500f2c Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6c500f2c Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6c500f2c
Branch: refs/heads/master Commit: 6c500f2cc923c58600a4d1094e119094c618cf2c Parents: 6c4471a Author: raminder <[email protected]> Authored: Thu Aug 28 11:17:06 2014 -0400 Committer: raminder <[email protected]> Committed: Thu Aug 28 11:17:06 2014 -0400 ---------------------------------------------------------------------- .../ssh/handler/AdvancedSCPInputHandler.java | 123 ++++++++++++------- .../ssh/handler/AdvancedSCPOutputHandler.java | 14 +++ 2 files changed, 90 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index 86dcb22..7e3ecbb 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -52,6 +52,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.*; /** @@ -133,11 +135,6 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { this.passPhrase); } // Server info - ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - Cluster pbsCluster = null; - // here doesn't matter what the job manager is because we are only doing some file handling - // not really dealing with monitoring or job submission, so we pa - pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); if (index < oldIndex) { parentPath = oldFiles.get(index); @@ -149,48 +146,80 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { } DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); - - MessageContext input = jobExecutionContext.getInMessageContext(); - Set<String> parameters = input.getParameters().keySet(); - for (String paramName : parameters) { - ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName); - String paramValue = MappingFactory.toString(actualParameter); - //TODO: Review this with type - if ("URI".equals(actualParameter.getType().getType().toString())) { - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath); - ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); - StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFile); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { - List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); - List<String> newFiles = new ArrayList<String>(); - for (String paramValueEach : split) { - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - newFiles.add(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); - } else { - String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath); - StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); - GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - newFiles.add(stageInputFiles); - } - } - ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); - } - inputNew.getParameters().put(paramName, actualParameter); - } + Cluster pbsCluster = null; + // here doesn't matter what the job manager is because we are only doing some file handling + // not really dealing with monitoring or job submission, so we pa + String lastHost = null; + + MessageContext input = jobExecutionContext.getInMessageContext(); + Set<String> parameters = input.getParameters().keySet(); + for (String paramName : parameters) { + ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName); + String paramValue = MappingFactory.toString(actualParameter); + // TODO: Review this with type + if ("URI".equals(actualParameter.getType().getType().toString())) { + try { + URL file = new URL(paramValue); + this.userName = file.getUserInfo(); + this.hostName = file.getHost(); + paramValue = file.getPath(); + } catch (MalformedURLException e) { + log.error(e.getLocalizedMessage(),e); + } + ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) { + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + } + lastHost = hostName; + + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath); + ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { + List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); + List<String> newFiles = new ArrayList<String>(); + for (String paramValueEach : split) { + try { + URL file = new URL(paramValue); + this.userName = file.getUserInfo(); + this.hostName = file.getHost(); + paramValueEach = file.getPath(); + } catch (MalformedURLException e) { + log.error(e.getLocalizedMessage(),e); + } + ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) { + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + } + lastHost = hostName; + + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + newFiles.add(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath); + StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + newFiles.add(stageInputFiles); + } + } + ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); + } + inputNew.getParameters().put(paramName, actualParameter); + } } catch (Exception e) { log.error(e.getMessage()); throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java index 9b9f7b2..116d769 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -45,6 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -117,12 +119,24 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { this.passPhrase); } // Server info + if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){ + try{ + URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir()); + this.userName = outputPathURL.getUserInfo(); + this.hostName = outputPathURL.getHost(); + outputPath = outputPathURL.getPath(); + } catch (MalformedURLException e) { + log.error(e.getLocalizedMessage(),e); + } + } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + if(!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID() + File.separator; pbsCluster.makeDirectory(outputPath); + } pbsCluster.scpTo(outputPath, standardError); pbsCluster.scpTo(outputPath, standardOutput); List<DataObjectType> outputArray = new ArrayList<DataObjectType>();
