Allow usee to send the file URL to move the files. AIRAVATA-1419

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6c500f2c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6c500f2c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6c500f2c

Branch: refs/heads/master
Commit: 6c500f2cc923c58600a4d1094e119094c618cf2c
Parents: 6c4471a
Author: raminder <[email protected]>
Authored: Thu Aug 28 11:17:06 2014 -0400
Committer: raminder <[email protected]>
Committed: Thu Aug 28 11:17:06 2014 -0400

----------------------------------------------------------------------
 .../ssh/handler/AdvancedSCPInputHandler.java    | 123 ++++++++++++-------
 .../ssh/handler/AdvancedSCPOutputHandler.java   |  14 +++
 2 files changed, 90 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 86dcb22..7e3ecbb 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -52,6 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.*;
 
 /**
@@ -133,11 +135,6 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
                         this.passPhrase);
             }
             // Server info
-            ServerInfo serverInfo = new ServerInfo(this.userName, 
this.hostName);
-            Cluster pbsCluster = null;
-            // here doesn't matter what the job manager is because we are only 
doing some file handling
-            // not really dealing with monitoring or job submission, so we pa
-            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
             String parentPath = inputPath + File.separator + 
jobExecutionContext.getExperimentID() + File.separator + 
jobExecutionContext.getTaskData().getTaskID();
             if (index < oldIndex) {
                 parentPath = oldFiles.get(index);
@@ -149,48 +146,80 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
             }
             DataTransferDetails detail = new DataTransferDetails();
             TransferStatus status = new TransferStatus();
-          
-            MessageContext input = jobExecutionContext.getInMessageContext();
-            Set<String> parameters = input.getParameters().keySet();
-            for (String paramName : parameters) {
-                ActualParameter actualParameter = (ActualParameter) 
input.getParameters().get(paramName);
-                String paramValue = MappingFactory.toString(actualParameter);
-                //TODO: Review this with type
-                if 
("URI".equals(actualParameter.getType().getType().toString())) {
-                    if (index < oldIndex) {
-                        log.info("Input File: " + paramValue + " is already 
transfered, so we skip this operation !!!");
-                        ((URIParameterType) 
actualParameter.getType()).setValue(oldFiles.get(index));
-                        data.append(oldFiles.get(index++)).append(","); // we 
get already transfered file and increment the index
-                    } else {
-                        String stageInputFile = stageInputFiles(pbsCluster, 
paramValue, parentPath);
-                        ((URIParameterType) 
actualParameter.getType()).setValue(stageInputFile);
-                        StringBuffer temp = new 
StringBuffer(data.append(stageInputFile).append(",").toString());
-                        status.setTransferState(TransferState.UPLOAD);
-                        detail.setTransferStatus(status);
-                        detail.setTransferDescription("Input Data Staged: " + 
stageInputFile);
-                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, 
detail, jobExecutionContext.getTaskData().getTaskID());
-                
-                        GFacUtils.savePluginData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
-                    }
-                } else if 
("URIArray".equals(actualParameter.getType().getType().toString())) {
-                    List<String> split = 
Arrays.asList(StringUtil.getElementsFromString(paramValue));
-                    List<String> newFiles = new ArrayList<String>();
-                    for (String paramValueEach : split) {
-                        if (index < oldIndex) {
-                            log.info("Input File: " + paramValue + " is 
already transfered, so we skip this operation !!!");
-                            newFiles.add(oldFiles.get(index));
-                            data.append(oldFiles.get(index++)).append(",");
-                        } else {
-                            String stageInputFiles = 
stageInputFiles(pbsCluster, paramValueEach, parentPath);
-                            StringBuffer temp = new 
StringBuffer(data.append(stageInputFiles).append(",").toString());
-                            GFacUtils.savePluginData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
-                            newFiles.add(stageInputFiles);
-                        }
-                    }
-                    ((URIArrayType) 
actualParameter.getType()).setValueArray(newFiles.toArray(new 
String[newFiles.size()]));
-                }
-                inputNew.getParameters().put(paramName, actualParameter);
-            }
+            Cluster pbsCluster = null;
+            // here doesn't matter what the job manager is because we are only 
doing some file handling
+            // not really dealing with monitoring or job submission, so we pa
+            String lastHost = null;
+            
+                       MessageContext input = 
jobExecutionContext.getInMessageContext();
+                       Set<String> parameters = input.getParameters().keySet();
+                       for (String paramName : parameters) {
+                               ActualParameter actualParameter = 
(ActualParameter) input.getParameters().get(paramName);
+                               String paramValue = 
MappingFactory.toString(actualParameter);
+                               // TODO: Review this with type
+                               if 
("URI".equals(actualParameter.getType().getType().toString())) {
+                                       try {
+                                               URL file = new URL(paramValue);
+                                               this.userName = 
file.getUserInfo();
+                                               this.hostName = file.getHost();
+                                               paramValue = file.getPath();
+                                       } catch (MalformedURLException e) {
+                                               
log.error(e.getLocalizedMessage(),e);
+                                       }
+                                       ServerInfo serverInfo = new 
ServerInfo(this.userName, this.hostName);
+                                       if (pbsCluster == null && (lastHost == 
null || !lastHost.equals(hostName))) {
+                                               pbsCluster = new 
PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+                                       }
+                                       lastHost = hostName;
+
+                                       if (index < oldIndex) {
+                                               log.info("Input File: " + 
paramValue + " is already transfered, so we skip this operation !!!");
+                                               ((URIParameterType) 
actualParameter.getType()).setValue(oldFiles.get(index));
+                                               
data.append(oldFiles.get(index++)).append(","); // we get already transfered 
file and increment the index
+                                       } else {
+                                               String stageInputFile = 
stageInputFiles(pbsCluster, paramValue, parentPath);
+                                               ((URIParameterType) 
actualParameter.getType()).setValue(stageInputFile);
+                                               StringBuffer temp = new 
StringBuffer(data.append(stageInputFile).append(",").toString());
+                                               
status.setTransferState(TransferState.UPLOAD);
+                                               
detail.setTransferStatus(status);
+                                               
detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+                                               
registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+
+                                               
GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), 
this.getClass().getName());
+                                       }
+                               } else if 
("URIArray".equals(actualParameter.getType().getType().toString())) {
+                                       List<String> split = 
Arrays.asList(StringUtil.getElementsFromString(paramValue));
+                                       List<String> newFiles = new 
ArrayList<String>();
+                                       for (String paramValueEach : split) {
+                                               try {
+                                                       URL file = new 
URL(paramValue);
+                                                       this.userName = 
file.getUserInfo();
+                                                       this.hostName = 
file.getHost();
+                                                       paramValueEach = 
file.getPath();
+                                               } catch (MalformedURLException 
e) {
+                                                       
log.error(e.getLocalizedMessage(),e);
+                                               }
+                                               ServerInfo serverInfo = new 
ServerInfo(this.userName, this.hostName);
+                                               if (pbsCluster == null && 
(lastHost == null || !lastHost.equals(hostName))) {
+                                                       pbsCluster = new 
PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+                                               }
+                                               lastHost = hostName;
+
+                                               if (index < oldIndex) {
+                                                       log.info("Input File: " 
+ paramValue + " is already transfered, so we skip this operation !!!");
+                                                       
newFiles.add(oldFiles.get(index));
+                                                       
data.append(oldFiles.get(index++)).append(",");
+                                               } else {
+                                                       String stageInputFiles 
= stageInputFiles(pbsCluster, paramValueEach, parentPath);
+                                                       StringBuffer temp = new 
StringBuffer(data.append(stageInputFiles).append(",").toString());
+                                                       
GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), 
this.getClass().getName());
+                                                       
newFiles.add(stageInputFiles);
+                                               }
+                                       }
+                                       ((URIArrayType) 
actualParameter.getType()).setValueArray(newFiles.toArray(new 
String[newFiles.size()]));
+                               }
+                               inputNew.getParameters().put(paramName, 
actualParameter);
+                       }
         } catch (Exception e) {
             log.error(e.getMessage());
             throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/airavata/blob/6c500f2c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index 9b9f7b2..116d769 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -45,6 +45,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -117,12 +119,24 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
                         this.passPhrase);
             }
             // Server info
+            
if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir()
 != null){
+               try{
+               URL outputPathURL = new 
URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+               this.userName = outputPathURL.getUserInfo();
+               this.hostName = outputPathURL.getHost();
+               outputPath = outputPathURL.getPath();
+               } catch (MalformedURLException e) {
+                                       log.error(e.getLocalizedMessage(),e);
+                               }
+            }
             ServerInfo serverInfo = new ServerInfo(this.userName, 
this.hostName);
 
             Cluster pbsCluster = new PBSCluster(serverInfo, 
authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+            
if(!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
             outputPath = outputPath + File.separator + 
jobExecutionContext.getExperimentID() + "-" + 
jobExecutionContext.getTaskData().getTaskID()
                     + File.separator;
             pbsCluster.makeDirectory(outputPath);
+            }
             pbsCluster.scpTo(outputPath, standardError);
             pbsCluster.scpTo(outputPath, standardOutput);
             List<DataObjectType> outputArray = new ArrayList<DataObjectType>();

Reply via email to