Repository: airavata Updated Branches: refs/heads/master aa4cd2cb6 -> 555790220
Update data tranfer details.AIRAVATA-1410 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b1532829 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b1532829 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b1532829 Branch: refs/heads/master Commit: b1532829c7079a7038a7b64dea14d9a38c64cd4b Parents: 6305d3a Author: raminder <[email protected]> Authored: Wed Aug 20 15:05:48 2014 -0400 Committer: raminder <[email protected]> Committed: Wed Aug 20 15:05:48 2014 -0400 ---------------------------------------------------------------------- .../gfac/gsissh/handler/GSISSHInputHandler.java | 11 +++++-- .../gsissh/handler/GSISSHOutputHandler.java | 4 +-- .../ssh/handler/AdvancedSCPInputHandler.java | 18 ++++++++++-- .../gfac/ssh/handler/SSHInputHandler.java | 30 ++++++++++++++++++-- .../gfac/ssh/handler/SSHOutputHandler.java | 4 +-- 5 files changed, 55 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b1532829/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java index 6b84fb4..b976065 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java @@ -102,9 +102,14 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler { ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index } else { - String s = stageInputFiles(jobExecutionContext, paramValue); - ((URIParameterType) actualParameter.getType()).setValue(s); - StringBuffer temp = new StringBuffer(data.append(s).append(",").toString()); + String stageInputFile = stageInputFiles(jobExecutionContext, paramValue); + ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); } } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { http://git-wip-us.apache.org/repos/asf/airavata/blob/b1532829/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java index f8d8c52..2f91eac 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java @@ -180,12 +180,12 @@ public class GSISSHOutputHandler extends AbstractRecoverableHandler { String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); - status.setTransferState(TransferState.COMPLETE); + status.setTransferState(TransferState.STDOUT_DOWNLOAD); detail.setTransferStatus(status); detail.setTransferDescription("STDOUT:" + stdOutStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - status.setTransferState(TransferState.COMPLETE); + status.setTransferState(TransferState.STDERROR_DOWNLOAD); detail.setTransferStatus(status); detail.setTransferDescription("STDERR:" + stdErrStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); http://git-wip-us.apache.org/repos/asf/airavata/blob/b1532829/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index 9fe2819..86dcb22 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -41,6 +41,10 @@ import org.apache.airavata.gsi.ssh.impl.PBSCluster; import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.model.workspace.experiment.DataTransferDetails; +import org.apache.airavata.model.workspace.experiment.TransferState; +import org.apache.airavata.model.workspace.experiment.TransferStatus; +import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.airavata.schemas.gfac.URIArrayType; import org.apache.airavata.schemas.gfac.URIParameterType; @@ -143,6 +147,9 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString()); GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); } + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + MessageContext input = jobExecutionContext.getInMessageContext(); Set<String> parameters = input.getParameters().keySet(); for (String paramName : parameters) { @@ -155,9 +162,14 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index } else { - String s = stageInputFiles(pbsCluster, paramValue, parentPath); - ((URIParameterType) actualParameter.getType()).setValue(s); - StringBuffer temp = new StringBuffer(data.append(s).append(",").toString()); + String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath); + ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); } } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { http://git-wip-us.apache.org/repos/asf/airavata/blob/b1532829/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java index 4c1345d..2ed2a60 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java @@ -58,6 +58,10 @@ public class SSHInputHandler extends AbstractHandler { public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); + int index = 0; + int oldIndex = 0; + List<String> oldFiles = new ArrayList<String>(); + StringBuffer data = new StringBuffer("|"); MessageContext inputNew = new MessageContext(); try { @@ -80,9 +84,28 @@ public class SSHInputHandler extends AbstractHandler { String paramValue = MappingFactory.toString(actualParameter); //TODO: Review this with type if ("URI".equals(actualParameter.getType().getType().toString())) { - ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue)); + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + }else{ + String stageInputFile = stageInputFiles(jobExecutionContext, paramValue); + ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { - List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + }else{ + List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); List<String> newFiles = new ArrayList<String>(); for (String paramValueEach : split) { String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach); @@ -91,8 +114,11 @@ public class SSHInputHandler extends AbstractHandler { detail.setTransferDescription("Input Data Staged: " + stageInputFiles); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); newFiles.add(stageInputFiles); + StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); } ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); + } } inputNew.getParameters().put(paramName, actualParameter); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b1532829/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java index 9f90cf3..3ad0543 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java @@ -144,12 +144,12 @@ public class SSHOutputHandler extends AbstractHandler { String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); - status.setTransferState(TransferState.COMPLETE); + status.setTransferState(TransferState.STDOUT_DOWNLOAD); detail.setTransferStatus(status); detail.setTransferDescription("STDOUT:" + stdOutStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - status.setTransferState(TransferState.COMPLETE); + status.setTransferState(TransferState.STDERROR_DOWNLOAD); detail.setTransferStatus(status); detail.setTransferDescription("STDERR:" + stdErrStr); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
