Repository: airavata
Updated Branches:
  refs/heads/master 0a46c0cba -> eeb00a39c


https://issues.apache.org/jira/browse/AIRAVATA-1078


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1f7a8d94
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1f7a8d94
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1f7a8d94

Branch: refs/heads/master
Commit: 1f7a8d946e580d5cdc79833ed38d647db9661ac5
Parents: f558dee
Author: lahiru <[email protected]>
Authored: Wed Mar 12 23:43:07 2014 -0400
Committer: lahiru <[email protected]>
Committed: Wed Mar 12 23:43:07 2014 -0400

----------------------------------------------------------------------
 .../src/main/resources/conf/gfac-config.xml     |  32 ++--
 .../gfac/handler/AdvancedSCPInputHandler.java   | 149 +++++++++++++++++++
 .../gfac/handler/AdvancedSCPOutputHandler.java  | 110 ++++++++++++++
 .../airavata/gfac/handler/SCPOutputHandler.java |  14 +-
 .../gsi/ssh/api/job/SlurmOutputParser.java      |   2 +-
 5 files changed, 294 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml
----------------------------------------------------------------------
diff --git 
a/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml 
b/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml
index f03ecde..b0a9bce 100644
--- 
a/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml
+++ 
b/modules/distribution/airavata-server/src/main/resources/conf/gfac-config.xml
@@ -71,13 +71,27 @@
             <Handler 
class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
         </OutHandlers>
     </Provider>
-     <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" 
host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
-         <InHandlers>
-            <Handler 
class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler 
class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
-        </OutHandlers>
-    </Provider>
+    <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" 
host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
+             <InHandlers>
+                <Handler 
class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
+                 <!--Handler 
class="org.apache.airavata.gfac.handler.AdvancedSCPInputHandler">
+                            <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="root"/>
+                        <property name="hostName" value="gw98.iu.xsede.org"/>
+                        <property name="outputPath" value="/tmp"/>
+                </Handler-->
+                <Handler 
class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+            </InHandlers>
+            <OutHandlers>
+                <Handler 
class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+                <!--Handler 
class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+                            <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="root"/>
+                        <property name="hostName" value="gw111.iu.xsede.org"/>
+                        <property name="outputPath" value="/tmp"/>
+                </Handler-->
+            </OutHandlers>
+        </Provider>
 </GFac>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..92b4ee6
--- /dev/null
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import com.sun.tools.javac.util.Paths;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import 
org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import 
org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.criteria.Path;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to 
computing resource
+ * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+                            <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="airavata"/>
+                        <property name="hostName" value="gw98.iu.xsede.org"/>
+                        <property name="outputPath" 
value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String outputPath;
+
+    public void initProperties(Map<String, String> properties) throws 
GFacHandlerException, GFacException {
+        password = properties.get("password");
+        passPhrase = properties.get("passPhrase");
+        privateKeyPath = properties.get("privateKeyPath");
+        publicKeyPath = properties.get("publicKeyPath");
+        userName = properties.get("userName");
+        hostName = properties.get("hostName");
+        outputPath = properties.get("outputPath");
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException, GFacException {
+        ApplicationDeploymentDescriptionType app = 
jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        String standardError = app.getStandardError();
+        String standardOutput = app.getStandardOutput();
+        String outputDataDirectory = app.getOutputDataDirectory();
+
+        AuthenticationInfo authenticationInfo = null;
+        if (password != null) {
+            authenticationInfo = new 
DefaultPasswordAuthenticationInfo(this.password);
+        } else {
+            authenticationInfo = new 
DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                    this.passPhrase);
+        }
+        // Server info
+        ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+        Cluster pbsCluster = null;
+        MessageContext inputNew = new MessageContext();
+        try {
+            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+            String parentPath = outputPath + File.separator + 
jobExecutionContext.getExperimentID() + File.separator + 
jobExecutionContext.getTaskData().getTaskID();
+            pbsCluster.makeDirectory(parentPath);
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                ActualParameter actualParameter = (ActualParameter) 
input.getParameters().get(paramName);
+                String paramValue = MappingFactory.toString(actualParameter);
+                //TODO: Review this with type
+                if 
("URI".equals(actualParameter.getType().getType().toString())) {
+                    ((URIParameterType) 
actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, 
parentPath));
+                } else if 
("URIArray".equals(actualParameter.getType().getType().toString())) {
+                    List<String> split = 
Arrays.asList(StringUtil.getElementsFromString(paramValue));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String paramValueEach : split) {
+                        String stageInputFiles = stageInputFiles(pbsCluster, 
paramValueEach, parentPath);
+                        newFiles.add(stageInputFiles);
+                    }
+                    ((URIArrayType) 
actualParameter.getType()).setValueArray(newFiles.toArray(new 
String[newFiles.size()]));
+                }
+                inputNew.getParameters().put(paramName, actualParameter);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    private String stageInputFiles(Cluster cluster, String paramValue, String 
parentPath) throws GFacException {
+        try {
+            cluster.scpFrom(paramValue, parentPath);
+            return "file://" + parentPath + File.separator + 
URI.create(paramValue).toURL().getFile();
+        } catch (SSHApiException e) {
+            log.error("Error tranfering remote file to local file, remote 
path: " + paramValue);
+            throw new GFacException(e);
+        } catch (MalformedURLException e) {
+            log.error("Error processing input URL");
+            throw new GFacException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..06496a1
--- /dev/null
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import 
org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import 
org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be 
invoked
+ * Should add following configuration to gfac-config.xml and configure the 
keys properly
+ * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+                            <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="airavata"/>
+                        <property name="hostName" value="gw98.iu.xsede.org"/>
+                        <property name="outputPath" 
value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String outputPath;
+
+    public void initProperties(Map<String, String> properties) throws 
GFacHandlerException, GFacException {
+        password = properties.get("password");
+        passPhrase = properties.get("passPhrase");
+        privateKeyPath = properties.get("privateKeyPath");
+        publicKeyPath = properties.get("publicKeyPath");
+        userName = properties.get("userName");
+        hostName = properties.get("hostName");
+        outputPath = properties.get("outputPath");
+    }
+
+    @Override
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException, GFacException {
+        ApplicationDeploymentDescriptionType app = 
jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        String standardError = app.getStandardError();
+        String standardOutput = app.getStandardOutput();
+        String outputDataDirectory = app.getOutputDataDirectory();
+
+        AuthenticationInfo authenticationInfo = null;
+        if (password != null) {
+            authenticationInfo = new 
DefaultPasswordAuthenticationInfo(this.password);
+        } else {
+            authenticationInfo = new 
DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                    this.passPhrase);
+        }
+        // Server info
+        ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+        try {
+            Cluster pbsCluster = new PBSCluster(serverInfo, 
authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+            String parentPath = outputPath + File.separator + 
jobExecutionContext.getExperimentID() + File.separator + 
jobExecutionContext.getTaskData().getTaskID();
+            pbsCluster.makeDirectory(parentPath);
+            pbsCluster.scpTo(parentPath, standardError);
+            pbsCluster.scpTo(parentPath, standardOutput);
+        } catch (SSHApiException e) {
+            log.error("Error transfering files to remote host : " + hostName + 
" with the user: " + userName);
+            log.error(e.getMessage());
+            throw new GFacException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
index 07b869c..2a1a49a 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -37,6 +37,7 @@ import 
org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.OutputUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
 import org.apache.airavata.registry.cpi.ChildDataType;
@@ -79,8 +80,10 @@ public class SCPOutputHandler extends AbstractHandler{
 
             if (taskData.getAdvancedOutputDataHandling() != null) {
                 outputDataDir = 
taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+                AdvancedOutputDataHandling advancedOutputDataHandling = 
taskData.getAdvancedOutputDataHandling();
             }
             if (outputDataDir != null) {
+                app.setOutputDataDirectory(outputDataDir);    // These will be 
useful if we are doing third party transfer
                 localStdOutFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stdout");
                 localStdErrFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stderr");
             } else {
@@ -98,12 +101,12 @@ public class SCPOutputHandler extends AbstractHandler{
             detail.setTransferStatus(status);
             detail.setTransferDescription("STDOUT:" + stdOutStr);
             registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, 
jobExecutionContext.getTaskData().getTaskID());
-          
+
             status.setTransferState(TransferState.COMPLETE);
             detail.setTransferStatus(status);
             detail.setTransferDescription("STDERR:" + stdErrStr);
             registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, 
jobExecutionContext.getTaskData().getTaskID());
-          
+
 
             Map<String, ActualParameter> stringMap = new HashMap<String, 
ActualParameter>();
             Map<String, Object> output = 
jobExecutionContext.getOutMessageContext().getParameters();
@@ -116,7 +119,12 @@ public class SCPOutputHandler extends AbstractHandler{
             status.setTransferState(TransferState.DOWNLOAD);
             detail.setTransferStatus(status);
             registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, 
jobExecutionContext.getTaskData().getTaskID());
-        
+
+            app.setStandardError(localStdErrFile.getAbsolutePath());
+            app.setStandardOutput(localStdOutFile.getAbsolutePath());
+            if (outputDataDir != null) {
+                app.setOutputDataDirectory(outputDataDir);
+            }
         } catch (XmlException e) {
             throw new GFacHandlerException("Cannot read output:" + 
e.getMessage(), e);
         } catch (ConnectionException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/1f7a8d94/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index 3b9d2c3..d118ee5 100644
--- 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -37,7 +37,7 @@ public class SlurmOutputParser implements OutputParser {
         String lastString = info[info.length -1];
         if (lastString.contains("JOB ID")) {
             // because there's no state
-            descriptor.setStatus("E");
+            descriptor.setStatus("U");
         }else{
             int column = 0;
             System.out.println(lastString);

Reply via email to