http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..e53fe09
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CommonUtils {
+    private final static AiravataLogger logger = 
AiravataLoggerFactory.getLogger(CommonUtils.class);
+
+    public static String getChannelID(MonitorID monitorID) {
+        return monitorID.getUserName() + "-" + 
monitorID.getComputeResourceDescription().getHostName();
+    }
+
+    public static String getRoutingKey(MonitorID monitorID) {
+        return "*." + monitorID.getUserName() + "." + 
monitorID.getComputeResourceDescription().getIpAddresses().get(0);
+    }
+
+    public static String getChannelID(String userName,String hostAddress) {
+        return userName + "-" + hostAddress;
+    }
+
+    public static String getRoutingKey(String userName,String hostAddress) {
+        return "*." + userName + "." + hostAddress;
+    }
+
+    public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, 
MonitorID monitorID, JobExecutionContext jobExecutionContext) throws 
AiravataMonitorException {
+        synchronized (queue) {
+            Iterator<UserMonitorData> iterator = queue.iterator();
+            while (iterator.hasNext()) {
+                UserMonitorData next = iterator.next();
+                if (next.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> monitorIDs = 
next.getHostMonitorData();
+                    for (HostMonitorData host : monitorIDs) {
+                        if (isEqual(host.getComputeResourceDescription(), 
monitorID.getComputeResourceDescription())) {
+                            // ok we found right place to add this monitorID
+                            host.addMonitorIDForHost(monitorID);
+                            logger.debugId(monitorID.getJobID(), "Added new 
job to the monitoring queue, experiment {}," +
+                                    " task {}", monitorID.getExperimentID(), 
monitorID.getTaskID());
+                            return;
+                        }
+                    }
+                    // there is a userMonitor object for this user name but no 
Hosts for this host
+                    // so we have to create new Hosts
+                    HostMonitorData hostMonitorData = new 
HostMonitorData(jobExecutionContext);
+                    hostMonitorData.addMonitorIDForHost(monitorID);
+                    next.addHostMonitorData(hostMonitorData);
+                    logger.debugId(monitorID.getJobID(), "Added new job to the 
monitoring queue, experiment {}," +
+                            " task {}", monitorID.getExperimentID(), 
monitorID.getTaskID());
+                    return;
+                }
+            }
+            HostMonitorData hostMonitorData = new 
HostMonitorData(jobExecutionContext);
+            hostMonitorData.addMonitorIDForHost(monitorID);
+
+            UserMonitorData userMonitorData = new 
UserMonitorData(monitorID.getUserName());
+            userMonitorData.addHostMonitorData(hostMonitorData);
+            try {
+                queue.put(userMonitorData);
+                logger.debugId(monitorID.getJobID(), "Added new job to the 
monitoring queue, experiment {}," +
+                        " task {}", monitorID.getExperimentID(), 
monitorID.getTaskID());
+            } catch (InterruptedException e) {
+                throw new AiravataMonitorException(e);
+            }
+        }
+    }
+
+    private static boolean isEqual(ComputeResourceDescription comRes_1, 
ComputeResourceDescription comRes_2) {
+        return 
comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) &&
+                comRes_1.getHostName().equals(comRes_2.getHostName());
+    }
+
+    public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> 
queue,MonitorID monitorID){
+        Iterator<MonitorID> iterator = queue.iterator();
+        while(iterator.hasNext()){
+            MonitorID next = iterator.next();
+            if (monitorID.getUserName().equals(next.getUserName()) &&
+                    
CommonUtils.isEqual(monitorID.getComputeResourceDescription(), 
next.getComputeResourceDescription())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * This method doesn't have to be synchronized because it will be invoked 
by HPCPullMonitor which already synchronized
+     * @param monitorID
+     * @throws AiravataMonitorException
+     */
+    public static void removeMonitorFromQueue(UserMonitorData userMonitorData, 
MonitorID monitorID) throws AiravataMonitorException {
+                if 
(userMonitorData.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> hostMonitorData = 
userMonitorData.getHostMonitorData();
+                    Iterator<HostMonitorData> iterator1 = 
hostMonitorData.iterator();
+                    while (iterator1.hasNext()) {
+                        HostMonitorData iHostMonitorID = iterator1.next();
+                        if 
(isEqual(iHostMonitorID.getComputeResourceDescription(), 
monitorID.getComputeResourceDescription())) {
+                            Iterator<MonitorID> iterator2 = 
iHostMonitorID.getMonitorIDs().iterator();
+                            while (iterator2.hasNext()) {
+                                MonitorID iMonitorID = iterator2.next();
+                                if 
(iMonitorID.getJobID().equals(monitorID.getJobID())
+                                        || 
iMonitorID.getJobName().equals(monitorID.getJobName())) {
+                                    // OK we found the object, we cannot do 
list.remove(object) states of two objects
+                                    // could be different, thats why we check 
the jobID
+                                    iterator2.remove();
+                                    logger.infoId(monitorID.getJobID(), 
"Removed the jobId: {} JobName: {} from monitoring last " +
+                                            "status:{}", 
monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString());
+
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                }
+        logger.info("Cannot find the given MonitorID in the queue with 
userName " +
+                monitorID.getUserName() + "  and jobID " + 
monitorID.getJobID());
+        logger.info("This might not be an error because someone else removed 
this job from the queue");
+    }
+
+
+    public static void invokeOutFlowHandlers(JobExecutionContext 
jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = 
jobExecutionContext.getGFacConfiguration().getOutHandlers();
+
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = 
Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + 
handlerClassName, e);
+            } catch (InstantiationException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + 
handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + 
handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", 
e);
+            }
+        }
+    }
+
+        /**
+         *  Update job count for a given set of paths.
+         * @param curatorClient - CuratorFramework instance
+         * @param changeCountMap - map of change job count with relevant path
+         * @param isAdd - Should add or reduce existing job count by the given 
job count.
+         */
+    public static void updateZkWithJobCount(CuratorFramework curatorClient, 
final Map<String, Integer> changeCountMap, boolean isAdd) {
+        StringBuilder changeZNodePaths = new StringBuilder();
+        try {
+            for (String path : changeCountMap.keySet()) {
+                if (isAdd) {
+                    CommonUtils.checkAndCreateZNode(curatorClient, path);
+                }
+                byte[] byteData = curatorClient.getData().forPath(path);
+                String nodeData;
+                if (byteData == null) {
+                    if (isAdd) {
+                        curatorClient.setData().withVersion(-1).forPath(path, 
String.valueOf(changeCountMap.get(path)).getBytes());
+                    } else {
+                        // This is not possible, but we handle in case there 
any data zookeeper communication failure
+                        logger.warn("Couldn't reduce job count in " + path + " 
as it returns null data. Hence reset the job count to 0");
+                        curatorClient.setData().withVersion(-1).forPath(path, 
"0".getBytes());
+                    }
+                } else {
+                    nodeData = new String(byteData);
+                    if (isAdd) {
+                        curatorClient.setData().withVersion(-1).forPath(path,
+                                String.valueOf(changeCountMap.get(path) + 
Integer.parseInt(nodeData)).getBytes());
+                    } else {
+                        int previousCount = Integer.parseInt(nodeData);
+                        int removeCount = changeCountMap.get(path);
+                        if (previousCount >= removeCount) {
+                            
curatorClient.setData().withVersion(-1).forPath(path,
+                                    String.valueOf(previousCount - 
removeCount).getBytes());
+                        } else {
+                            // This is not possible, do we need to reset the 
job count to 0 ?
+                            logger.error("Requested remove job count is " + 
removeCount +
+                                    " which is higher than the existing job 
count " + previousCount
+                                    + " in  " + path + " path.");
+                        }
+                    }
+                }
+                changeZNodePaths.append(path).append(":");
+            }
+
+            // update stat node to trigger orchestrator watchers
+            if (changeCountMap.size() > 0) {
+                changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
+                curatorClient.setData().withVersion(-1).forPath("/" + 
Constants.STAT, changeZNodePaths.toString().getBytes());
+            }
+        } catch (Exception e) {
+            logger.error("Error while writing job count to zookeeper", e);
+        }
+
+    }
+
+    /**
+     * Increase job count by one and update the zookeeper
+     * @param monitorID - Job monitorId
+     */
+    public static void increaseZkJobCount(MonitorID monitorID) {
+        Map<String, Integer> addMap = new HashMap<String, Integer>();
+        addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
+        
updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), 
addMap, true);
+    }
+
+    /**
+     * Construct and return the path for a given MonitorID , eg: 
/stat/{username}/{resourceName}/job
+     * @param monitorID - Job monitorId
+     * @return
+     */
+    public static String getJobCountUpdatePath(MonitorID monitorID){
+        return new 
StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
+                
.append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString();
+    }
+
+    /**
+     * Check whether znode is exist in given path if not create a new znode
+     * @param curatorClient - zookeeper instance
+     * @param path - path to check znode
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private static void checkAndCreateZNode(CuratorFramework curatorClient , 
String path) throws Exception {
+        if (curatorClient.checkExists().forPath(path) == null) { // if znode 
doesn't exist
+            if (path.lastIndexOf("/") > 1) {  // recursively traverse to 
parent znode and check parent exist
+                checkAndCreateZNode(curatorClient, (path.substring(0, 
path.lastIndexOf("/"))));
+            }
+            
curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
new file mode 100644
index 0000000..08c3f67
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+    static {
+        // parsing of RSA key fails without this
+        java.security.Security.addProvider(new BouncyCastleProvider());
+    }
+
+
+
+    public static KeyStore keyStoreFromPEM(String proxyFile,
+                                           String keyPassPhrase) throws 
IOException,
+            CertificateException,
+            NoSuchAlgorithmException,
+            InvalidKeySpecException,
+            KeyStoreException {
+        return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+    }
+
+    public static KeyStore keyStoreFromPEM(String certFile,
+                                           String keyFile,
+                                           String keyPassPhrase) throws 
IOException,
+                                                                        
CertificateException,
+                                                                        
NoSuchAlgorithmException,
+                                                                        
InvalidKeySpecException,
+                                                                        
KeyStoreException {
+        CertificateFactory cf = CertificateFactory.getInstance("X.509");
+        X509Certificate cert = (X509Certificate)cf.generateCertificate(new 
FileInputStream(certFile));
+        //System.out.println(cert.toString());
+
+        // this works for proxy files, too, since it skips over the certificate
+        BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+        String line = null;
+        StringBuilder builder = new StringBuilder();
+        boolean inKey = false;
+        while((line=reader.readLine()) != null) {
+            if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+                inKey = true;
+            }
+            if (inKey) {
+                builder.append(line);
+                builder.append(System.getProperty("line.separator"));
+            }
+            if (line.contains("-----END RSA PRIVATE KEY-----")) {
+                inKey = false;
+            }
+        }
+        String privKeyPEM = builder.toString();
+        //System.out.println(privKeyPEM);
+
+        // using BouncyCastle
+//        PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM));
+//        Object object = pemParser.readObject();
+//
+//        PrivateKey privKey = null;
+//        if(object instanceof KeyPair){
+//            privKey = ((KeyPair)object).getPrivate();
+//        }
+        // PEMParser from BouncyCastle is good for reading PEM files, but I 
didn't want to add that dependency
+        /*
+        // Base64 decode the data
+        byte[] encoded = 
javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+        // PKCS8 decode the encoded RSA private key
+        java.security.spec.PKCS8EncodedKeySpec keySpec = new 
PKCS8EncodedKeySpec(encoded);
+        KeyFactory kf = KeyFactory.getInstance("RSA");
+        PrivateKey privKey = kf.generatePrivate(keySpec);
+        //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+        */
+        //System.out.println(privKey.toString());
+
+//        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+//        keyStore.load(null,null);
+//
+//        KeyStore.PrivateKeyEntry entry =
+//            new KeyStore.PrivateKeyEntry(privKey,
+//                                         new 
java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+//        KeyStore.PasswordProtection prot = new 
KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+//        keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, 
prot);
+
+//        return keyStore;
+        //TODO: Problem with BouncyCastle version used in gsissh 
+        throw new CertificateException("Method not implemented");
+
+    }
+
+
+    public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+                                                             KeyStoreException,
+                                                             
CertificateException,
+                                                             
NoSuchAlgorithmException, ApplicationSettingsException {
+        return 
trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location"));
+    }
+
+    public static KeyStore trustKeyStoreFromCertDir(String certDir) throws 
IOException,
+                                                                           
KeyStoreException,
+                                                                           
CertificateException,
+                                                                           
NoSuchAlgorithmException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null,null);
+
+        File dir = new File(certDir);
+        for(File file : dir.listFiles()) {
+            if (!file.isFile()) {
+                continue;
+            }
+            if (!file.getName().endsWith(".0")) {
+                continue;
+            }
+
+            try {
+                //System.out.println("reading file "+file.getName());
+                CertificateFactory cf = 
CertificateFactory.getInstance("X.509");
+                X509Certificate cert = (X509Certificate) 
cf.generateCertificate(new FileInputStream(file));
+                //System.out.println(cert.toString());
+
+                KeyStore.TrustedCertificateEntry entry = new 
KeyStore.TrustedCertificateEntry(cert);
+
+                ks.setEntry(cert.getSubjectX500Principal().getName(), entry, 
null);
+            } catch (KeyStoreException e) {
+            } catch (CertificateParsingException e) {
+                continue;
+            }
+
+        }
+
+        return ks;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
new file mode 100644
index 0000000..74642dc
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.context;
+
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+
+public class SSHAuthWrapper {
+    private ServerInfo serverInfo;
+
+    private AuthenticationInfo authenticationInfo;
+
+    private String key;
+
+    public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo 
authenticationInfo, String key) {
+        this.serverInfo = serverInfo;
+        this.authenticationInfo = authenticationInfo;
+        this.key = key;
+    }
+
+    public ServerInfo getServerInfo() {
+        return serverInfo;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public String getKey() {
+        return key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..9481188
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import 
org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import 
org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to 
computing resource
+ * <Handler class="AdvancedSCPOutputHandler">
+ * <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ * <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ * <property name="userName" value="airavata"/>
+ * <property name="hostName" value="gw98.iu.xsede.org"/>
+ * <property name="inputPath" value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler extends AbstractHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
+    public static final int DEFAULT_SSH_PORT = 22;
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String inputPath;
+
+    public void initProperties(Properties properties) throws 
GFacHandlerException {
+        password = (String) properties.get("password");
+        passPhrase = (String) properties.get("passPhrase");
+        privateKeyPath = (String) properties.get("privateKeyPath");
+        publicKeyPath = (String) properties.get("publicKeyPath");
+        userName = (String) properties.get("userName");
+        hostName = (String) properties.get("hostName");
+        inputPath = (String) properties.get("inputPath");
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
+        MessageContext inputNew = new MessageContext();
+        StringBuffer data = new StringBuffer("|");
+        Cluster pbsCluster = null;
+
+        try {
+            String pluginData = GFacUtils.getHandlerData(jobExecutionContext, 
this.getClass().getName());
+            if (pluginData != null) {
+                try {
+                    oldIndex = 
Integer.parseInt(pluginData.split("\\|")[0].trim());
+                    oldFiles = 
Arrays.asList(pluginData.split("\\|")[1].split(","));
+                    if (oldIndex == oldFiles.size()) {
+                        log.info("Old data looks good !!!!");
+                    } else {
+                        oldIndex = 0;
+                        oldFiles.clear();
+                    }
+                } catch (NumberFormatException e) {
+                    log.error("Previously stored data " + pluginData + " is 
wrong so we continue the operations");
+                }
+            }
+
+            AuthenticationInfo authenticationInfo = null;
+            if (password != null) {
+                authenticationInfo = new 
DefaultPasswordAuthenticationInfo(this.password);
+            } else {
+                authenticationInfo = new 
DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                        this.passPhrase);
+            }
+
+            // Server info
+            String parentPath = inputPath + File.separator + 
jobExecutionContext.getExperimentID() + File.separator + 
jobExecutionContext.getTaskData().getTaskID();
+            if (index < oldIndex) {
+                parentPath = oldFiles.get(index);
+                data.append(oldFiles.get(index++)).append(","); // we get 
already transfered file and increment the index
+            } else {
+                (new File(parentPath)).mkdirs();
+                StringBuffer temp = new 
StringBuffer(data.append(parentPath).append(",").toString());
+                GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, 
++index), this.getClass().getName());
+            }
+            DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            // here doesn't matter what the job manager is because we are only 
doing some file handling
+            // not really dealing with monitoring or job submission, so we pa
+
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                InputDataObjectType inputParamType = (InputDataObjectType) 
input.getParameters().get(paramName);
+                String paramValue = inputParamType.getValue();
+                // TODO: Review this with type
+                if (inputParamType.getType() == DataType.URI) {
+                    try {
+                        URL file = new URL(paramValue);
+                        String key = file.getUserInfo() + file.getHost() + 
DEFAULT_SSH_PORT;
+                        
GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, 
file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
+                        pbsCluster = 
((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+                        paramValue = file.getPath();
+                    } catch (MalformedURLException e) {
+                        String key = this.userName + this.hostName + 
DEFAULT_SSH_PORT;
+                        
GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, 
this.userName, this.hostName, DEFAULT_SSH_PORT);
+                        pbsCluster = 
((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+                        log.error(e.getLocalizedMessage(), e);
+                    }
+
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already 
transfered, so we skip this operation !!!");
+                        inputParamType.setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we 
get already transfered file and increment the index
+                    } else {
+                        String stageInputFile = stageInputFiles(pbsCluster, 
paramValue, parentPath);
+                        inputParamType.setValue(stageInputFile);
+                        StringBuffer temp = new 
StringBuffer(data.append(stageInputFile).append(",").toString());
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        detail.setTransferDescription("Input Data Staged: " + 
stageInputFile);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, 
detail, jobExecutionContext.getTaskData().getTaskID());
+
+                        GFacUtils.saveHandlerData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
+                    }
+                }
+                // FIXME: what is the thrift model DataType equivalent for 
URIArray type?
+//                else if 
("URIArray".equals(actualParameter.getType().getType().toString())) {
+//                    List<String> split = 
Arrays.asList(StringUtil.getElementsFromString(paramValue));
+//                    List<String> newFiles = new ArrayList<String>();
+//                    for (String paramValueEach : split) {
+//                        try {
+//                            URL file = new URL(paramValue);
+//                            this.userName = file.getUserInfo();
+//                            this.hostName = file.getHost();
+//                            paramValueEach = file.getPath();
+//                        } catch (MalformedURLException e) {
+//                            log.error(e.getLocalizedMessage(), e);
+//                        }
+//                        if (index < oldIndex) {
+//                            log.info("Input File: " + paramValue + " is 
already transfered, so we skip this operation !!!");
+//                            newFiles.add(oldFiles.get(index));
+//                            data.append(oldFiles.get(index++)).append(",");
+//                        } else {
+//                            String stageInputFiles = 
stageInputFiles(pbsCluster, paramValueEach, parentPath);
+//                            StringBuffer temp = new 
StringBuffer(data.append(stageInputFiles).append(",").toString());
+//                            GFacUtils.savePluginData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
+//                            newFiles.add(stageInputFiles);
+//                        }
+//                    }
+//                    ((URIArrayType) 
actualParameter.getType()).setValueArray(newFiles.toArray(new 
String[newFiles.size()]));
+//                }
+                inputNew.getParameters().put(paramName, inputParamType);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (GFacException e1) {
+                log.error(e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        this.invoke(jobExecutionContext);
+    }
+
+    private String stageInputFiles(Cluster cluster, String paramValue, String 
parentPath) throws GFacException {
+        try {
+            cluster.scpFrom(paramValue, parentPath);
+            return "file://" + parentPath + File.separator + (new 
File(paramValue)).getName();
+        } catch (SSHApiException e) {
+            log.error("Error tranfering remote file to local file, remote 
path: " + paramValue);
+            throw new GFacException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..320f236
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import 
org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import 
org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be 
invoked
+ * Should add following configuration to gfac-config.xml and configure the 
keys properly
+ * <Handler class="AdvancedSCPOutputHandler">
+                            <property name="privateKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" 
value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="airavata"/>
+                        <property name="hostName" value="gw98.iu.xsede.org"/>
+                        <property name="outputPath" 
value="/home/airavata/outputData"/>
+                        <property name="passPhrase" 
value="/home/airavata/outputData"/>
+                        <property name="password" 
value="/home/airavata/outputData"/>
+
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+    public static final int DEFAULT_SSH_PORT = 22;
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String outputPath;
+
+
+    public void initProperties(Properties properties) throws 
GFacHandlerException {
+        password = (String)properties.get("password");
+        passPhrase = (String)properties.get("passPhrase");
+        privateKeyPath = (String)properties.get("privateKeyPath");
+        publicKeyPath = (String)properties.get("publicKeyPath");
+        userName = (String)properties.get("userName");
+        hostName = (String)properties.get("hostName");
+        outputPath = (String)properties.get("outputPath");
+    }
+
+    @Override
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+       Cluster pbsCluster = null;
+        AuthenticationInfo authenticationInfo = null;
+        if (password != null) {
+            authenticationInfo = new 
DefaultPasswordAuthenticationInfo(this.password);
+        } else {
+            authenticationInfo = new 
DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                    this.passPhrase);
+        }
+        try {
+            String hostName = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                try {
+                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    try {
+                        StringWriter errors = new StringWriter();
+                        e.printStackTrace(new PrintWriter(errors));
+                                       
GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), 
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                               } catch (GFacException e1) {
+                                        log.error(e1.getLocalizedMessage());
+                               }
+                    throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+            String standardError = jobExecutionContext.getStandardError();
+            String standardOutput = jobExecutionContext.getStandardOutput();
+            super.invoke(jobExecutionContext);
+            // Server info
+            
if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && 
jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir()
 != null){
+                try{
+                    URL outputPathURL = new 
URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+                    this.userName = outputPathURL.getUserInfo();
+                    this.hostName = outputPathURL.getHost();
+                    outputPath = outputPathURL.getPath();
+                } catch (MalformedURLException e) {
+                    log.error(e.getLocalizedMessage(),e);
+                }
+            }
+            String key = 
GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, 
this.userName, this.hostName, DEFAULT_SSH_PORT);
+            pbsCluster = 
((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+            
if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && 
!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
+            outputPath = outputPath + File.separator + 
jobExecutionContext.getExperimentID() + "-" + 
jobExecutionContext.getTaskData().getTaskID()
+                    + File.separator;
+                pbsCluster.makeDirectory(outputPath);
+            }
+            pbsCluster.scpTo(outputPath, standardError);
+            pbsCluster.scpTo(outputPath, standardOutput);
+            List<OutputDataObjectType> outputArray = new 
ArrayList<OutputDataObjectType>();
+            Map<String, Object> output = 
jobExecutionContext.getOutMessageContext().getParameters();
+            Set<String> keys = output.keySet();
+            for (String paramName : keys) {
+                OutputDataObjectType outputDataObjectType = 
(OutputDataObjectType) output.get(paramName);
+                if (outputDataObjectType.getType() == DataType.URI) {
+                    // for failed jobs outputs are not generated. So we should 
not download outputs
+                    if (GFacUtils.isFailedJob(jobExecutionContext)){
+                        continue;
+                    }
+                       String downloadFile = outputDataObjectType.getValue();
+                    if(downloadFile == null || !(new 
File(downloadFile).isFile())){
+                        GFacUtils.saveErrorDetails(jobExecutionContext, "Empty 
Output returned from the application", CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                               throw new GFacHandlerException("Empty Output 
returned from the application.." );
+                       }
+                       pbsCluster.scpTo(outputPath, downloadFile);
+                    String fileName = 
downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, 
downloadFile.length());
+                    OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + 
fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.URI);
+                    
dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    
dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    
dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    
dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }else if (outputDataObjectType.getType() == DataType.STDOUT) {
+                    pbsCluster.scpTo(outputPath, standardOutput);
+                    String fileName = 
standardOutput.substring(standardOutput.lastIndexOf(File.separatorChar)+1, 
standardOutput.length());
+                    OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + 
fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.STDOUT);
+                    
dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    
dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    
dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    
dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }else if (outputDataObjectType.getType() == DataType.STDERR) {
+                    pbsCluster.scpTo(outputPath, standardError);
+                    String fileName = 
standardError.substring(standardError.lastIndexOf(File.separatorChar)+1, 
standardError.length());
+                    OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + 
fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.STDERR);
+                    
dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    
dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    
dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    
dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }
+             }
+           registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, 
jobExecutionContext.getExperimentID());
+        } catch (SSHApiException e) {
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                               GFacUtils.saveErrorDetails(jobExecutionContext, 
 errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                       } catch (GFacException e1) {
+                                log.error(e1.getLocalizedMessage());
+                       }
+            log.error("Error transfering files to remote host : " + hostName + 
" with the user: " + userName);
+            log.error(e.getMessage());
+            throw new GFacHandlerException(e);
+        } catch (Exception e) {
+                try {
+                               GFacUtils.saveErrorDetails(jobExecutionContext, 
 e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                       } catch (GFacException e1) {
+                                log.error(e1.getLocalizedMessage());
+                       }
+               throw new GFacHandlerException(e);
+        }
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
new file mode 100644
index 0000000..61a1805
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.gfac.ssh.handler;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.util.HandleOutputs;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NewSSHOutputHandler extends AbstractHandler{
+
+        private static final Logger log = 
LoggerFactory.getLogger(NewSSHOutputHandler.class);
+
+           public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+               String hostAddress = jobExecutionContext.getHostName();
+               Cluster cluster = null;
+               // Security Context and connection
+               try {
+                   if (jobExecutionContext.getSecurityContext(hostAddress) == 
null) {
+                       GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                   }
+                   cluster = ((SSHSecurityContext) 
jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+                   if (cluster == null) {
+                       throw new GFacProviderException("Security context is 
not set properly");
+                   } else {
+                       log.info("Successfully retrieved the Security Context");
+                   }
+               } catch (Exception e) {
+                   log.error(e.getMessage());
+                   try {
+                    StringWriter errors = new StringWriter();
+                    e.printStackTrace(new PrintWriter(errors));
+                       GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                   } catch (GFacException e1) {
+                       log.error(e1.getLocalizedMessage());
+                   }
+                   throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+               }
+
+               super.invoke(jobExecutionContext);
+               List<OutputDataObjectType> outputArray =  
HandleOutputs.handleOutputs(jobExecutionContext, cluster);
+               try {
+                               registry.add(ChildDataType.EXPERIMENT_OUTPUT, 
outputArray, jobExecutionContext.getExperimentID());
+                       } catch (RegistryException e) {
+                               throw new GFacHandlerException(e);
+                       }
+
+              
+           }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    @Override
+       public void initProperties(Properties properties) throws 
GFacHandlerException {
+               // TODO Auto-generated method stub
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
new file mode 100644
index 0000000..fb86dd3
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Properties;
+
+public class SSHDirectorySetupHandler extends AbstractHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
+
+       public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                               GFacUtils.saveErrorDetails(jobExecutionContext, 
 errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                       } catch (GFacException e1) {
+                                log.error(e1.getLocalizedMessage());
+                       }
+            throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+        } 
+
+        log.info("Setup SSH job directorties");
+        super.invoke(jobExecutionContext);
+        makeDirectory(jobExecutionContext);
+
+       }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    private void makeDirectory(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+               Cluster cluster = null;
+               try{
+            String hostAddress = jobExecutionContext.getHostName();
+            cluster = ((SSHSecurityContext) 
jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+        if (cluster == null) {
+            throw new GFacHandlerException("Security context is not set 
properly");
+        } else {
+            log.info("Successfully retrieved the Security Context");
+        }
+            String workingDirectory = jobExecutionContext.getWorkingDir();
+            cluster.makeDirectory(workingDirectory);
+            if(!jobExecutionContext.getInputDir().equals(workingDirectory))
+               cluster.makeDirectory(jobExecutionContext.getInputDir());
+            if(!jobExecutionContext.getOutputDir().equals(workingDirectory))
+               cluster.makeDirectory(jobExecutionContext.getOutputDir());
+            
+            DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            status.setTransferState(TransferState.DIRECTORY_SETUP);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("Working directory = " + 
workingDirectory);
+
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+
+        } catch (Exception e) {
+                       DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            status.setTransferState(TransferState.FAILED);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("Working directory = " + 
jobExecutionContext.getWorkingDir());
+            try {
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.FILE_SYSTEM_FAILURE);
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, 
e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error executing the Handler: " + 
SSHDirectorySetupHandler.class, e);
+        }
+        
+       }
+
+    public void initProperties(Properties properties) throws 
GFacHandlerException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
new file mode 100644
index 0000000..277ff0e
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHInputHandler extends AbstractHandler {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SSHInputHandler.class);
+
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        DataTransferDetails detail = new DataTransferDetails();
+        detail.setTransferDescription("Input Data Staging");
+        TransferStatus status = new TransferStatus();
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
+        StringBuffer data = new StringBuffer("|");
+        MessageContext inputNew = new MessageContext();
+        Cluster cluster = null;
+        
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                try {
+                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    try {
+                        StringWriter errors = new StringWriter();
+                        e.printStackTrace(new PrintWriter(errors));
+                                       
GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), 
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                               } catch (GFacException e1) {
+                                        log.error(e1.getLocalizedMessage());
+                               }
+                    throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+
+            cluster = ((SSHSecurityContext) 
jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacException("Security context is not set 
properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            log.info("Invoking SCPInputHandler");
+            super.invoke(jobExecutionContext);
+
+
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                InputDataObjectType inputParamType = (InputDataObjectType) 
input.getParameters().get(paramName);
+                String paramValue = inputParamType.getValue();
+                //TODO: Review this with type
+                if (inputParamType.getType() == DataType.URI) {
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already 
transfered, so we skip this operation !!!");
+                        inputParamType.setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we 
get already transfered file and increment the index
+                    } else {
+                        String stageInputFile = stageInputFiles(cluster, 
jobExecutionContext, paramValue);
+                        inputParamType.setValue(stageInputFile);
+                        StringBuffer temp = new 
StringBuffer(data.append(stageInputFile).append(",").toString());
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        detail.setTransferDescription("Input Data Staged: " + 
stageInputFile);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, 
detail, jobExecutionContext.getTaskData().getTaskID());
+
+                        GFacUtils.saveHandlerData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
+                    }
+                }// FIXME: what is the thrift model DataType equivalent for 
URIArray type?
+//                else if 
("URIArray".equals(actualParameter.getType().getType().toString())) {
+//                     if (index < oldIndex) {
+//                        log.info("Input File: " + paramValue + " is already 
transfered, so we skip this operation !!!");
+//                        ((URIParameterType) 
actualParameter.getType()).setValue(oldFiles.get(index));
+//                        data.append(oldFiles.get(index++)).append(","); // 
we get already transfered file and increment the index
+//                    }else{
+//                     List<String> split = 
Arrays.asList(StringUtil.getElementsFromString(paramValue));
+//                    List<String> newFiles = new ArrayList<String>();
+//                    for (String paramValueEach : split) {
+//                        String stageInputFiles = 
stageInputFiles(cluster,jobExecutionContext, paramValueEach);
+//                        status.setTransferState(TransferState.UPLOAD);
+//                        detail.setTransferStatus(status);
+//                        detail.setTransferDescription("Input Data Staged: " 
+ stageInputFiles);
+//                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, 
detail, jobExecutionContext.getTaskData().getTaskID());
+//                        newFiles.add(stageInputFiles);
+//                        StringBuffer temp = new 
StringBuffer(data.append(stageInputFiles).append(",").toString());
+//                        GFacUtils.savePluginData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
+//                    }
+//                    ((URIArrayType) 
actualParameter.getType()).setValueArray(newFiles.toArray(new 
String[newFiles.size()]));
+//                    }
+//                }
+                inputNew.getParameters().put(paramName, inputParamType);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            status.setTransferState(TransferState.FAILED);
+            detail.setTransferStatus(status);
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext, 
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.FILE_SYSTEM_FAILURE);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, 
e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    private static String stageInputFiles(Cluster cluster, JobExecutionContext 
jobExecutionContext, String paramValue) throws IOException, GFacException {
+        int i = paramValue.lastIndexOf(File.separator);
+        String substring = paramValue.substring(i + 1);
+        try {
+            String targetFile = jobExecutionContext.getInputDir() + 
File.separator + substring;
+            if(paramValue.startsWith("scp:")){
+               paramValue = paramValue.substring(paramValue.indexOf(":") + 1, 
paramValue.length());
+               cluster.scpThirdParty(paramValue, targetFile);
+            }else{
+            if(paramValue.startsWith("file")){
+                paramValue = paramValue.substring(paramValue.indexOf(":") + 1, 
paramValue.length());
+            }
+            boolean success = false;
+            int j = 1;
+            while(!success){
+            try {
+                               cluster.scpTo(targetFile, paramValue);
+                               success = true;
+                       } catch (Exception e) {
+                               log.info(e.getLocalizedMessage());
+                               Thread.sleep(2000);
+                                if(j==3) {
+                                       throw new GFacHandlerException("Error 
while input File Staging", e, e.getLocalizedMessage());
+                                }
+            }
+            j++;
+            }
+            }
+            return targetFile;
+        } catch (Exception e) {
+            throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());
+        }
+    }
+
+    public void initProperties(Properties properties) throws 
GFacHandlerException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
new file mode 100644
index 0000000..7c5538a
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.impl.OutputUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHOutputHandler extends AbstractHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(SSHOutputHandler.class);
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        String hostAddress = jobExecutionContext.getHostName();
+        try {
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (GFacException e1) {
+                log.error(e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
+        }
+
+        super.invoke(jobExecutionContext);
+        DataTransferDetails detail = new DataTransferDetails();
+        detail.setTransferDescription("Output data staging");
+        TransferStatus status = new TransferStatus();
+
+        Cluster cluster = null;
+        try {
+             cluster = ((SSHSecurityContext) 
jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set 
properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+
+            // Get the Stdouts and StdErrs
+            String timeStampedExperimentID = 
GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
+
+            TaskDetails taskData = jobExecutionContext.getTaskData();
+            String outputDataDir = 
ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
+            File localStdOutFile;
+            File localStdErrFile;
+            //FIXME: AdvancedOutput is remote location and third party 
transfer should work to make this work 
+//            if (taskData.getAdvancedOutputDataHandling() != null) {
+//                outputDataDir = 
taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+//            }
+            if (outputDataDir == null) {
+                outputDataDir = File.separator + "tmp";
+            }
+            outputDataDir = outputDataDir + File.separator + 
jobExecutionContext.getExperimentID() + "-" + 
jobExecutionContext.getTaskData().getTaskID();
+            (new File(outputDataDir)).mkdirs();
+
+
+            localStdOutFile = new File(outputDataDir + File.separator + 
timeStampedExperimentID + "stdout");
+            localStdErrFile = new File(outputDataDir + File.separator + 
timeStampedExperimentID + "stderr");
+//            cluster.makeDirectory(outputDataDir);
+            int i = 0;
+            String stdOutStr = "";
+            while (stdOutStr.isEmpty()) {
+                try {
+                    cluster.scpFrom(jobExecutionContext.getStandardOutput(), 
localStdOutFile.getAbsolutePath());
+                    stdOutStr = 
GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+                } catch (Exception e) {
+                    log.error(e.getLocalizedMessage());
+                    Thread.sleep(2000);
+                }
+                i++;
+                if (i == 3) break;
+            }
+            Thread.sleep(1000);
+            cluster.scpFrom(jobExecutionContext.getStandardError(), 
localStdErrFile.getAbsolutePath());
+            Thread.sleep(1000);
+
+            String stdErrStr = 
GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+            status.setTransferState(TransferState.STDOUT_DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDOUT:" + 
localStdOutFile.getAbsolutePath());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+
+            status.setTransferState(TransferState.STDERROR_DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDERR:" + 
localStdErrFile.getAbsolutePath());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+
+
+            List<OutputDataObjectType> outputArray = new 
ArrayList<OutputDataObjectType>();
+            Map<String, Object> output = 
jobExecutionContext.getOutMessageContext().getParameters();
+            Set<String> keys = output.keySet();
+            for (String paramName : keys) {
+                OutputDataObjectType actualParameter = (OutputDataObjectType) 
output.get(paramName);
+                if (DataType.URI == actualParameter.getType()) {
+                    List<String> outputList = null;
+                    int retry = 3;
+                    while (retry > 0) {
+                        outputList = 
cluster.listDirectory(jobExecutionContext.getOutputDir());
+                        if (outputList.size() > 0) {
+                            break;
+                        }
+                        retry--;
+                        Thread.sleep(2000);
+                    }
+
+                    if (outputList.size() == 0 || outputList.get(0).isEmpty() 
|| outputList.size() > 1) {
+                        OutputUtils.fillOutputFromStdout(output, stdOutStr, 
stdErrStr, outputArray);
+                        Set<String> strings = output.keySet();
+                        outputArray.clear();
+                        for (String key : strings) {
+                            OutputDataObjectType actualParameter1 = 
(OutputDataObjectType) output.get(key);
+                            if (DataType.URI == actualParameter1.getType()) {
+                                String downloadFile = 
actualParameter1.getValue();
+                                cluster.scpFrom(downloadFile, outputDataDir);
+                                String fileName = 
downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, 
downloadFile.length());
+                                String localFile = outputDataDir + 
File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter1.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.URI);
+                                outputArray.add(dataObjectType);
+                            }else if (DataType.STDOUT == 
actualParameter.getType()) {
+                                String fileName = localStdOutFile.getName();
+                                String localFile = outputDataDir + 
File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.STDOUT);
+                                outputArray.add(dataObjectType);
+                            }else if (DataType.STDERR == 
actualParameter.getType()) {
+                                String fileName = localStdErrFile.getName();
+                                String localFile = outputDataDir + 
File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.STDERR);
+                                outputArray.add(dataObjectType);
+                            }
+                        }
+                        break;
+                    } else if (outputList.size() == 1) {//FIXME: Ultrascan case
+                        String valueList = outputList.get(0);
+                        cluster.scpFrom(jobExecutionContext.getOutputDir() + 
File.separator + valueList, outputDataDir);
+                        String outputPath = outputDataDir + File.separator + 
valueList;
+                        jobExecutionContext.addOutputFile(outputPath);
+                        actualParameter.setValue(outputPath);
+                        OutputDataObjectType dataObjectType = new 
OutputDataObjectType();
+                        dataObjectType.setValue(outputPath);
+                        dataObjectType.setName(paramName);
+                        dataObjectType.setType(DataType.URI);
+                        outputArray.add(dataObjectType);
+                    }
+                } else {
+                    OutputUtils.fillOutputFromStdout(output, stdOutStr, 
stdErrStr, outputArray);
+                }
+            }
+            if (outputArray == null || outputArray.isEmpty()) {
+                log.error("Empty Output returned from the Application, Double 
check the application and ApplicationDescriptor output Parameter Names");
+                if 
(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) {
+                    throw new GFacHandlerException(
+                            "Empty Output returned from the Application, 
Double check the application"
+                                    + "and ApplicationDescriptor output 
Parameter Names");
+                }
+            }
+            
jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath());
+            
jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath());
+            jobExecutionContext.setOutputDir(outputDataDir);
+            status.setTransferState(TransferState.DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription(outputDataDir);
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+            registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, 
jobExecutionContext.getExperimentID());
+
+        } catch (Exception e) {
+            try {
+                status.setTransferState(TransferState.FAILED);
+                detail.setTransferStatus(status);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, 
jobExecutionContext.getTaskData().getTaskID());
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.FILE_SYSTEM_FAILURE);
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, 
e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error in retrieving results", e);
+        }
+
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    public void initProperties(Properties properties) throws 
GFacHandlerException {
+
+    }
+}

Reply via email to