Added Local batch job submission support and renamed task implementtions to 
have generic names


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dc1be312
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dc1be312
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dc1be312

Branch: refs/heads/master
Commit: dc1be3126409992f6de94d76e0fc4834540e9e9c
Parents: c62f74a
Author: Shameera Rathnayaka <[email protected]>
Authored: Mon Nov 9 15:25:52 2015 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Mon Nov 9 15:25:52 2015 -0500

----------------------------------------------------------------------
 .../server/src/main/resources/gfac-config.yaml  |  11 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |  16 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      |   4 +-
 .../airavata/gfac/impl/LocalCommandOutput.java  |   2 +-
 .../impl/task/AdvancedSCPDataStageTask.java     | 344 -----------------
 .../airavata/gfac/impl/task/DataStageTask.java  | 126 +++++++
 .../impl/task/DefaultJobSubmissionTask.java     | 286 +++++++++++++++
 .../gfac/impl/task/EnvironmentSetupTask.java    |  74 ++++
 .../gfac/impl/task/ForkJobSubmissionTask.java   | 184 ++++++++++
 .../gfac/impl/task/SCPDataStageTask.java        | 365 +++++++++++++++----
 .../gfac/impl/task/SSHEnvironmentSetupTask.java |  74 ----
 .../impl/task/SSHForkJobSubmissionTask.java     | 184 ----------
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 286 ---------------
 13 files changed, 980 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml 
b/modules/configuration/server/src/main/resources/gfac-config.yaml
index 8dafe09..dff4062 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -20,7 +20,7 @@
 
 jobSubmitters:
   - submissionProtocol: SSH
-    taskClass: org.apache.airavata.gfac.impl.task.SSHJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
 #   properties:
 #     - userName: airavata
 #       passPhrase: airavata
@@ -29,10 +29,11 @@ jobSubmitters:
 #       hostName: remote.client.hostName
 
   - submissionProtocol: SSH_FORK
-    taskClass: org.apache.airavata.gfac.impl.task.SSHForkJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.ForkJobSubmissionTask
 
   - submissionProtocol: LOCAL
-    taskClass: org.apache.airavata.gfac.impl.task.LocalJobSubmissionTask
+    taskClass: org.apache.airavata.gfac.impl.task.DefaultJobSubmissionTask
+
 
 # Following job subbmitters are not yet implemented.
 
@@ -47,14 +48,14 @@ commonTasks:
 
 fileTransferTasks:
   - transferProtocol: SCP
-    taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
+    taskClass: org.apache.airavata.gfac.impl.task.DataStageTask
 
 # If your client doen't run the same instance where airavata server is running 
then you need to comment above
 # SCPDataStageTask and uncomment AdvancedSCPDataStageTask. To work with 
AdvancedSCPDataStageTask, you either need to
 # provide ssh keys or password.
 
 #  - transferProtocol: SCP
-#    taskClass: org.apache.airavata.gfac.impl.task.AdvancedSCPDataStageTask
+#    taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask
 #    properties:
 #     - userName: airavata
 #       passPhrase: airavata

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index e2966c5..4dc63e6 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -207,11 +207,17 @@ public abstract class Factory {
         String computeResourceId = processContext.getComputeResourceId();
         String key = processContext.getJobSubmissionProtocol().toString() + 
":" + computeResourceId;
                RemoteCluster remoteCluster = remoteClusterMap.get(key);
-               if (remoteCluster == null) {
-                       JobManagerConfiguration jobManagerConfiguration = 
getJobManagerConfiguration(processContext.getResourceJobManager());
-                       AuthenticationInfo authenticationInfo = 
getSSHKeyAuthentication();
-                       remoteCluster = new 
HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, 
authenticationInfo);
-                       remoteClusterMap.put(key, remoteCluster);
+               JobSubmissionProtocol jobSubmissionProtocol = 
processContext.getJobSubmissionProtocol();
+        if (remoteCluster == null) {
+            JobManagerConfiguration jobManagerConfiguration = 
getJobManagerConfiguration(processContext.getResourceJobManager());
+            if (jobSubmissionProtocol == JobSubmissionProtocol.LOCAL) {
+                remoteCluster = new 
LocalRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, 
null);
+            } else if (jobSubmissionProtocol == JobSubmissionProtocol.SSH ||
+                    jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) {
+                AuthenticationInfo authenticationInfo = 
getSSHKeyAuthentication();
+                remoteCluster = new 
HPCRemoteCluster(processContext.getServerInfo(), jobManagerConfiguration, 
authenticationInfo);
+            }
+            remoteClusterMap.put(key, remoteCluster);
                }
                return remoteCluster;
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 6444eb4..d386f66 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -35,7 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
+import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
@@ -356,7 +356,7 @@ public class GFacEngineImpl implements GFacEngine {
             EnvironmentSetupTaskModel subTaskModel = 
(EnvironmentSetupTaskModel) taskContext.getSubTaskModel();
             Task envSetupTask = null;
             if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) {
-                envSetupTask = new SSHEnvironmentSetupTask();
+                envSetupTask = new EnvironmentSetupTask();
             } else {
                 throw new GFacException("Unsupported security protocol, 
Airavata doesn't support " +
                         subTaskModel.getProtocol().name() + " protocol yet.");

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
index e9d683d..4d98423 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -55,6 +55,6 @@ public class LocalCommandOutput implements CommandOutput {
 
     @Override
     public int getExitCode() {
-        return 0;
+        return process.exitValue();
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
deleted file mode 100644
index 6c4e14e..0000000
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.credential.store.credential.Credential;
-import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
-import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
-import org.apache.airavata.gfac.core.cluster.CommandInfo;
-import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This will be used for both Input file staging and output file staging, 
hence if you do any changes to a part of logic
- * in this class please consider that will works with both input and output 
cases.
- */
-public class AdvancedSCPDataStageTask implements Task {
-    private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPDataStageTask.class);
-    private static final int DEFAULT_SSH_PORT = 22;
-    private String password;
-    private String publicKeyPath;
-    private String passPhrase;
-    private String privateKeyPath;
-    private String userName;
-    private String hostName;
-    private String inputPath;
-
-    @Override
-    public void init(Map<String, String> propertyMap) throws TaskException {
-        inputPath = propertyMap.get("inputPath");
-        hostName = propertyMap.get("hostName");
-        userName = propertyMap.get("userName");
-    }
-
-    @Override
-    public TaskStatus execute(TaskContext taskContext) {
-        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
-        AuthenticationInfo authenticationInfo = null;
-        DataStagingTaskModel subTaskModel = null;
-        String localDataDir = null;
-        ProcessState processState = 
taskContext.getParentProcessContext().getProcessState();
-        try {
-            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
-                    (taskContext.getTaskModel());
-            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-                OutputDataObjectType processOutput = 
taskContext.getProcessOutput();
-                if (processOutput != null && processOutput.getValue() == null) 
{
-                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't 
stage file {} , file name shouldn't be null",
-                            taskContext.getExperimentId(), 
taskContext.getProcessId(), taskContext.getTaskId(),
-                            processOutput.getName());
-                    status = new TaskStatus(TaskState.FAILED);
-                    if (processOutput.isIsRequired()) {
-                        status.setReason("File name is null, but this output's 
isRequired bit is not set");
-                    } else {
-                        status.setReason("File name is null");
-                    }
-                    return status;
-                }
-            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
-                InputDataObjectType processInput = 
taskContext.getProcessInput();
-                if (processInput != null && processInput.getValue() == null) {
-                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't 
stage file {} , file name shouldn't be null",
-                            taskContext.getExperimentId(), 
taskContext.getProcessId(), taskContext.getTaskId(),
-                            processInput.getName());
-                    status = new TaskStatus(TaskState.FAILED);
-                    if (processInput.isIsRequired()) {
-                        status.setReason("File name is null, but this input's 
isRequired bit is not set");
-                    } else {
-                        status.setReason("File name is null");
-                    }
-                    return status;
-                }
-            } else {
-                status.setState(TaskState.FAILED);
-                status.setReason("Invalid task invocation, Support " + 
ProcessState.INPUT_DATA_STAGING.name() + " and " +
-                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " 
process phases. found " + processState.name());
-                return status;
-            }
-
-            // use rsync instead of scp if source and destination host and 
user name is same.
-            URI sourceURI = new URI(subTaskModel.getSource());
-            String fileName = 
sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 
1,
-                    sourceURI.getPath().length());
-            URI destinationURI = null;
-            if (subTaskModel.getDestination().startsWith("dummy")) {
-                destinationURI = getDestinationURI(taskContext, fileName);
-                subTaskModel.setDestination(destinationURI.toString());
-            } else {
-                destinationURI = new URI(subTaskModel.getDestination());
-            }
-
-            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
-                    && 
sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
-                localDataCopy(taskContext, sourceURI, destinationURI);
-                status.setState(TaskState.COMPLETED);
-                status.setReason("Locally copied file using 'cp' command ");
-                return status;
-            }
-
-
-            String tokenId = 
taskContext.getParentProcessContext().getTokenId();
-            CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
-            Credential credential = 
credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(),
 tokenId);
-            if (credential instanceof SSHCredential) {
-                SSHCredential sshCredential = (SSHCredential) credential;
-                byte[] publicKey = sshCredential.getPublicKey();
-                publicKeyPath = writeFileToDisk(publicKey);
-                byte[] privateKey = sshCredential.getPrivateKey();
-                privateKeyPath = writeFileToDisk(privateKey);
-                passPhrase = sshCredential.getPassphrase();
-//                userName = sshCredential.getPortalUserName(); // this might 
not same as login user name
-                authenticationInfo = getSSHKeyAuthentication();
-            } else {
-                String msg = "Provided credential store token is not valid. 
Please provide the correct credential store token";
-                log.error(msg);
-                status.setState(TaskState.FAILED);
-                status.setReason(msg);
-                ErrorModel errorModel = new ErrorModel();
-                errorModel.setActualErrorMessage(msg);
-                errorModel.setUserFriendlyMessage(msg);
-                taskContext.getTaskModel().setTaskError(errorModel);
-                return status;
-            }
-            status = new TaskStatus(TaskState.COMPLETED);
-
-            ServerInfo serverInfo = new ServerInfo(userName, hostName, 
DEFAULT_SSH_PORT);
-            Session sshSession = Factory.getSSHSession(authenticationInfo, 
serverInfo);
-            if (processState == ProcessState.INPUT_DATA_STAGING) {
-                inputDataStaging(taskContext, sshSession, sourceURI, 
destinationURI);
-                status.setReason("Successfully staged input data");
-            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
-                String targetPath = destinationURI.getPath().substring(0, 
destinationURI.getPath().lastIndexOf('/'));
-                SSHUtils.makeDirectory(targetPath, sshSession);
-                // TODO - save updated subtask model with new destination
-                outputDataStaging(taskContext, sshSession, sourceURI, 
destinationURI);
-                status.setReason("Successfully staged output data");
-            }
-        } catch (TException e) {
-            String msg = "Couldn't create subTask model thrift model";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-            return status;
-        } catch (ApplicationSettingsException | FileNotFoundException | 
CredentialStoreException | IllegalAccessException | InstantiationException e) {
-            String msg = "Failed while reading credentials";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (URISyntaxException e) {
-            String msg = "Sorce or destination uri is not correct source : " + 
subTaskModel.getSource() + ", " +
-                    "destination : " + subTaskModel.getDestination();
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (SSHApiException e) {
-            String msg = "Failed to do scp with compute resource";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (AiravataException e) {
-            String msg = "Error while creating ssh session with client";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (JSchException | IOException e) {
-            String msg = "Failed to do scp with client";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        } catch (GFacException e) {
-            String msg = "Failed update experiment and process inputs and 
outputs";
-            log.error(msg, e);
-            status.setState(TaskState.FAILED);
-            status.setReason(msg);
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setActualErrorMessage(e.getMessage());
-            errorModel.setUserFriendlyMessage(msg);
-            taskContext.getTaskModel().setTaskError(errorModel);
-        }
-        return status;
-    }
-
-    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI 
destinationURI) throws SSHApiException {
-        StringBuilder sb = new StringBuilder("rsync -cr ");
-        sb.append(sourceURI.getPath()).append(" 
").append(destinationURI.getPath());
-        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
-        
taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
-    }
-
-    private void inputDataStaging(TaskContext taskContext, Session sshSession, 
URI sourceURI, URI
-            destinationURI) throws SSHApiException, IOException, JSchException 
{
-        /**
-         * scp third party file transfer 'to' compute resource.
-         */
-        
taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
-                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.TO);
-    }
-
-    private void outputDataStaging(TaskContext taskContext, Session 
sshSession, URI sourceURI, URI destinationURI)
-            throws SSHApiException, AiravataException, IOException, 
JSchException, GFacException {
-
-        /**
-         * scp third party file transfer 'from' comute resource.
-         */
-        
taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
-                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.FROM);
-        // update output locations
-        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
-        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
-
-    }
-
-    @Override
-    public TaskStatus recover(TaskContext taskContext) {
-        TaskState state = taskContext.getTaskStatus().getState();
-        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
-            return execute(taskContext);
-        } else {
-            // files already transferred or failed
-            return taskContext.getTaskStatus();
-        }
-    }
-
-    @Override
-    public TaskTypes getType() {
-        return TaskTypes.DATA_STAGING;
-    }
-
-    private SSHPasswordAuthentication getSSHPasswordAuthentication() {
-        return new SSHPasswordAuthentication(userName, password);
-    }
-
-    private SSHKeyAuthentication getSSHKeyAuthentication() {
-        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
-        sshKA.setUserName(userName);
-        sshKA.setPassphrase(passPhrase);
-        sshKA.setPrivateKeyFilePath(privateKeyPath);
-        sshKA.setPublicKeyFilePath(publicKeyPath);
-        sshKA.setStrictHostKeyChecking("no");
-        return sshKA;
-    }
-
-    private String writeFileToDisk(byte[] data) {
-        File temp = null;
-        try {
-            temp = File.createTempFile("id_rsa", "");
-            //write it
-            FileOutputStream bw = new FileOutputStream(temp);
-            bw.write(data);
-            bw.close();
-        } catch (IOException e) {
-            log.error(e.getMessage(), e);
-        }
-        return temp.getAbsolutePath();
-    }
-
-    public URI getDestinationURI(TaskContext taskContext, String fileName) 
throws URISyntaxException {
-        String filePath = (inputPath.endsWith(File.separator) ? inputPath : 
inputPath + File.separator) +
-                taskContext.getParentProcessContext().getProcessId() + 
File.separator + fileName;
-        return new URI("SCP", hostName, filePath, null);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
new file mode 100644
index 0000000..ab9d562
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class DataStageTask implements Task {
+       private static final Logger log = 
LoggerFactory.getLogger(DataStageTask.class);
+
+       @Override
+       public void init(Map<String, String> propertyMap) throws TaskException {
+
+       }
+
+       @Override
+       public TaskStatus execute(TaskContext taskContext) {
+               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+               if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
+                       status.setState(TaskState.FAILED);
+                       status.setReason("Invalid task call, expected " + 
TaskTypes.DATA_STAGING.toString() + " but found "
+                                       + 
taskContext.getTaskModel().getTaskType().toString());
+               } else {
+                       try {
+                               DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+                                               .getTaskModel());
+                               URI sourceURI = new 
URI(subTaskModel.getSource());
+                               URI destinationURI = new 
URI(subTaskModel.getDestination());
+
+                               ProcessState processState = 
taskContext.getParentProcessContext().getProcessState();
+                               if (processState == 
ProcessState.INPUT_DATA_STAGING) {
+                                       /**
+                                        * copy local file to compute resource.
+                                        */
+                                       
taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
+                               } else if (processState == 
ProcessState.OUTPUT_DATA_STAGING) {
+                                       /**
+                                        * copy remote file from compute 
resource.
+                                        */
+                                       
taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
+                               }
+                               status.setReason("Successfully staged data");
+                       } catch (SSHApiException e) {
+                               String msg = "Scp attempt failed";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
+                       } catch (TException e) {
+                               String msg = "Invalid task invocation";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
+                       } catch (URISyntaxException e) {
+                               String msg = "source or destination is not a 
valid URI";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
+                       }
+               }
+               return status;
+       }
+
+       @Override
+       public TaskStatus recover(TaskContext taskContext) {
+        TaskState state = taskContext.getTaskStatus().getState();
+        if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
+            return execute(taskContext);
+        } else {
+            // files already transferred or failed
+            return taskContext.getTaskStatus();
+        }
+       }
+
+       @Override
+       public TaskTypes getType() {
+               return TaskTypes.DATA_STAGING;
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
new file mode 100644
index 0000000..020880d
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class DefaultJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = 
LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext){
+           TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set 
to completed.
+           try {
+                   ProcessContext processContext = 
taskContext.getParentProcessContext();
+                   JobModel jobModel = processContext.getJobModel();
+                   jobModel.setTaskId(taskContext.getTaskId());
+                   RemoteCluster remoteCluster = 
processContext.getRemoteCluster();
+                   JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(processContext,taskContext);
+                   jobModel.setJobName(jobDescriptor.getJobName());
+                   ResourceJobManager resourceJobManager = 
GFacUtils.getResourceJobManager(processContext);
+                   JobManagerConfiguration jConfig = null;
+                   if (resourceJobManager != null) {
+                           jConfig = 
Factory.getJobManagerConfiguration(resourceJobManager);
+                   }
+                   JobStatus jobStatus = new JobStatus();
+                   File jobFile = GFacUtils.createJobFile(taskContext, 
jobDescriptor, jConfig);
+                   if (jobFile != null && jobFile.exists()) {
+                           
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                           JobSubmissionOutput jobSubmissionOutput = 
remoteCluster.submitBatchJob(jobFile.getPath(),
+                                           processContext.getWorkingDir());
+                           
jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+                           jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+                           jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+                           String jobId = jobSubmissionOutput.getJobId();
+                           if (jobId != null && !jobId.isEmpty()) {
+                                   jobModel.setJobId(jobId);
+                                   GFacUtils.saveJobModel(processContext, 
jobModel);
+                                   jobStatus.setJobState(JobState.SUBMITTED);
+                                   jobStatus.setReason("Successfully Submitted 
to " + taskContext.getParentProcessContext()
+                                                   
.getComputeResourceDescription().getHostName());
+                    
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                   jobModel.setJobStatus(jobStatus);
+                                   
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                                   if 
(verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+                                           
jobStatus.setJobState(JobState.QUEUED);
+                                           jobStatus.setReason("Verification 
step succeeded");
+                        
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                           jobModel.setJobStatus(jobStatus);
+                                           
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                                   }
+                                   taskStatus = new 
TaskStatus(TaskState.COMPLETED);
+                                   taskStatus.setReason("Submitted job to 
compute resource");
+                    
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                           } else {
+                                   int verificationTryCount = 0;
+                                   while (verificationTryCount++ < 3) {
+                                           String verifyJobId = 
verifyJobSubmission(remoteCluster, jobModel);
+                                           if (verifyJobId != null && 
!verifyJobId.isEmpty()) {
+                                                   // JobStatus either changed 
from SUBMITTED to QUEUED or directly to QUEUED
+                                                   jobId = verifyJobId;
+                                                   jobModel.setJobId(jobId);
+                                                   
GFacUtils.saveJobModel(processContext,jobModel);
+                                                   
jobStatus.setJobState(JobState.QUEUED);
+                                                   
jobStatus.setReason("Verification step succeeded");
+                            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                                   
jobModel.setJobStatus(jobStatus);
+                                                   
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                                                   
taskStatus.setState(TaskState.COMPLETED);
+                                                   
taskStatus.setReason("Submitted job to compute resource");
+                            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                                                   break;
+                                           }
+                                           log.info("Verify step return 
invalid jobId, retry verification step in {} secs", verificationTryCount * 10);
+                                           Thread.sleep(verificationTryCount * 
10000);
+                                   }
+                           }
+
+                           if (jobId == null || jobId.isEmpty()) {
+                                   String msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                                                   "remote jobId for JobName:" 
+ jobModel.getJobName() + ", both submit and verify steps " +
+                                                   "doesn't return a valid 
JobId. " + "Hence changing experiment state to Failed";
+                                   log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setUserFriendlyMessage(msg);
+                    errorModel.setActualErrorMessage(msg);
+                                   
GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+                                   taskStatus.setState(TaskState.FAILED);
+                                   taskStatus.setReason("Couldn't find job id 
in both submitted and verified steps");
+                    
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                           }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+                   } else {
+                           taskStatus.setState(TaskState.FAILED);
+                           if (jobFile == null) {
+                                   taskStatus.setReason("JobFile is null");
+                           } else {
+                                   taskStatus.setReason("Job file doesn't 
exist");
+                           }
+                   }
+
+           } catch (AppCatalogException e) {
+                   String msg = "Error while instantiating app catalog";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (ApplicationSettingsException e) {
+                   String msg = "Error occurred while creating job descriptor";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (GFacException e) {
+                   String msg = "Error occurred while creating job descriptor";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (SSHApiException e) {
+                   String msg = "Error occurred while submitting the job";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (IOException e) {
+                   String msg = "Error while reading the content of the job 
file";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (InterruptedException e) {
+                   String msg = "Error occurred while verifying the job 
submission";
+                   log.error(msg, e);
+                   taskStatus.setState(TaskState.FAILED);
+                   taskStatus.setReason(msg);
+            
taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           }
+
+           taskContext.setTaskStatus(taskStatus);
+           try {
+                   GFacUtils.saveAndPublishTaskStatus(taskContext);
+           } catch (GFacException e) {
+                   log.error("Error while saving task status", e);
+           }
+           return taskStatus;
+    }
+
+    private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, 
String jobID) throws SSHApiException {
+        JobStatus status = remoteCluster.getJobStatus(jobID);
+        return status != null &&  status.getJobState() != JobState.UNKNOWN;
+    }
+
+    private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel 
jobDetails) {
+        String jobName = jobDetails.getJobName();
+        String jobId = null;
+        try {
+            jobId  = remoteCluster.getJobIdByJobName(jobName, 
remoteCluster.getServerInfo().getUserName());
+        } catch (SSHApiException e) {
+            log.error("Error while verifying JobId from JobName");
+        }
+        return jobId;
+    }
+
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+            ProcessContext processContext = 
taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            // original job failed before submitting
+            if (jobModel == null || jobModel.getJobId() == null ){
+                return execute(taskContext);
+            }else {
+                   // job is already submitted and monitor should handle the 
recovery
+                   return new TaskStatus(TaskState.COMPLETED);
+            }
+    }
+
+       @Override
+       public TaskTypes getType() {
+               return TaskTypes.JOB_SUBMISSION;
+       }
+
+       @Override
+       public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+               ProcessContext processContext = 
taskcontext.getParentProcessContext();
+               RemoteCluster remoteCluster = processContext.getRemoteCluster();
+               JobModel jobModel = processContext.getJobModel();
+               int retryCount = 0;
+               if (jobModel != null) {
+                       try {
+                               JobStatus oldJobStatus = 
remoteCluster.getJobStatus(jobModel.getJobId());
+                               while (oldJobStatus == null && retryCount <= 5) 
{
+                                       retryCount++;
+                                       Thread.sleep(retryCount * 1000);
+                                       oldJobStatus = 
remoteCluster.getJobStatus(jobModel.getJobId());
+                               }
+                               if (oldJobStatus != null) {
+                                       oldJobStatus = 
remoteCluster.cancelJob(jobModel.getJobId());
+                                       return oldJobStatus;
+                               } else {
+                                       throw new TaskException("Cancel 
operation failed, Job status couldn't find in resource, JobId " +
+                                                       jobModel.getJobId());
+                               }
+                       } catch (SSHApiException | InterruptedException e) {
+                               throw new TaskException("Error while cancelling 
job " + jobModel.getJobId(), e);
+                       }
+               } else {
+                       throw new TaskException("Couldn't complete cancel 
operation, JobModel is null in ProcessContext.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
new file mode 100644
index 0000000..fff130c
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class EnvironmentSetupTask implements Task {
+
+       private static final Logger log = 
LoggerFactory.getLogger(EnvironmentSetupTask.class);
+       @Override
+       public void init(Map<String, String> propertyMap) throws TaskException {
+
+       }
+
+       @Override
+       public TaskStatus execute(TaskContext taskContext) {
+               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
+               try {
+                       RemoteCluster remoteCluster = 
taskContext.getParentProcessContext().getRemoteCluster();
+                       
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+                       status.setReason("Successfully created environment");
+               } catch (SSHApiException e) {
+                       String msg = "Error while environment setup";
+                       log.error(msg, e);
+                       status.setState(TaskState.FAILED);
+                       status.setReason(msg);
+                       ErrorModel errorModel = new ErrorModel();
+                       errorModel.setActualErrorMessage(e.getMessage());
+                       errorModel.setUserFriendlyMessage(msg);
+                       taskContext.getTaskModel().setTaskError(errorModel);
+               }
+               return status;
+       }
+
+       @Override
+       public TaskStatus recover(TaskContext taskContext) {
+               return execute(taskContext);
+       }
+
+       @Override
+       public TaskTypes getType() {
+               return TaskTypes.ENV_SETUP;
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
new file mode 100644
index 0000000..ed75fef
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class ForkJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = 
LoggerFactory.getLogger(ForkJobSubmissionTask.class);
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        try {
+            ProcessContext processContext = 
taskContext.getParentProcessContext();
+            JobModel jobModel = processContext.getJobModel();
+            jobModel.setTaskId(taskContext.getTaskId());
+            RemoteCluster remoteCluster = processContext.getRemoteCluster();
+            JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(processContext, taskContext);
+            jobModel.setJobName(jobDescriptor.getJobName());
+            ResourceJobManager resourceJobManager = 
GFacUtils.getResourceJobManager(processContext);
+            JobManagerConfiguration jConfig = null;
+            if (resourceJobManager != null) {
+                jConfig = 
Factory.getJobManagerConfiguration(resourceJobManager);
+            }
+            JobStatus jobStatus = new JobStatus();
+               File jobFile = GFacUtils.createJobFile(taskContext, 
jobDescriptor, jConfig);
+               if (jobFile != null && jobFile.exists()) {
+                
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                   JobSubmissionOutput jobSubmissionOutput = 
remoteCluster.submitBatchJob(jobFile.getPath(),
+                                   processContext.getWorkingDir());
+                   jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+                   jobModel.setStdErr(jobSubmissionOutput.getStdErr());
+                   jobModel.setStdOut(jobSubmissionOutput.getStdOut());
+                   String jobId = jobSubmissionOutput.getJobId();
+                   if (jobId != null && !jobId.isEmpty()) {
+                    jobModel.setJobId(jobId);
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                    jobStatus.setJobState(JobState.SUBMITTED);
+                    jobStatus.setReason("Successfully Submitted to " + 
taskContext.getParentProcessContext()
+                            .getComputeResourceDescription().getHostName());
+                    
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    jobModel.setJobStatus(jobStatus);
+                    
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                    taskStatus = new TaskStatus(TaskState.COMPLETED);
+                    taskStatus.setReason("Submitted job to compute resource");
+                }
+                if (jobId == null || jobId.isEmpty()) {
+                    String msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                            "remote jobId for JobName:" + 
jobModel.getJobName() + ", both submit and verify steps " +
+                            "doesn't return a valid JobId. " + "Hence changing 
experiment state to Failed";
+                    log.error(msg);
+                    ErrorModel errorModel = new ErrorModel();
+                    errorModel.setActualErrorMessage(msg);
+                    
errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                    GFacUtils.saveExperimentError(processContext, errorModel);
+                    GFacUtils.saveProcessError(processContext, errorModel);
+                    GFacUtils.saveTaskError(taskContext, errorModel);
+                    taskStatus.setState(TaskState.FAILED);
+                    taskStatus.setReason("Couldn't find job id in both 
submitted and verified steps");
+                }else {
+                    GFacUtils.saveJobModel(processContext, jobModel);
+                }
+            } else {
+                taskStatus.setState(TaskState.FAILED);
+                if (jobFile == null) {
+                    taskStatus.setReason("JobFile is null");
+                } else {
+                    taskStatus.setReason("Job file doesn't exist");
+                }
+            }
+        } catch (ApplicationSettingsException e) {
+            String msg = "Error occurred while creating job descriptor";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (AppCatalogException e) {
+            String msg = "Error while instantiating app catalog";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (GFacException e) {
+            String msg = "Error occurred while creating job descriptor";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (SSHApiException e) {
+            String msg = "Error occurred while submitting the job";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (IOException e) {
+            String msg = "Error while reading the content of the job file";
+            log.error(msg, e);
+            taskStatus.setState(TaskState.FAILED);
+            taskStatus.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        }
+        return taskStatus;
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        //TODO implement recovery scenario instead of calling execute.
+        return execute(taskContext);
+    }
+
+       @Override
+       public TaskTypes getType() {
+               return TaskTypes.JOB_SUBMISSION;
+       }
+
+       @Override
+       public JobStatus cancel(TaskContext taskcontext) {
+               // TODO - implement cancel with SSH Fork
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 32ee31b..678ded1 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,11 +20,32 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
+import org.apache.airavata.gfac.core.authentication.SSHPasswordAuthentication;
+import org.apache.airavata.gfac.core.cluster.CommandInfo;
+import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.TaskState;
@@ -35,81 +56,240 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
 
+/**
+ * This will be used for both Input file staging and output file staging, 
hence if you do any changes to a part of logic
+ * in this class please consider that will works with both input and output 
cases.
+ */
 public class SCPDataStageTask implements Task {
-       private static final Logger log = 
LoggerFactory.getLogger(SCPDataStageTask.class);
-
-       @Override
-       public void init(Map<String, String> propertyMap) throws TaskException {
-
-       }
-
-       @Override
-       public TaskStatus execute(TaskContext taskContext) {
-               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
-               if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
-                       status.setState(TaskState.FAILED);
-                       status.setReason("Invalid task call, expected " + 
TaskTypes.DATA_STAGING.toString() + " but found "
-                                       + 
taskContext.getTaskModel().getTaskType().toString());
-               } else {
-                       try {
-                               DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
-                                               .getTaskModel());
-                               URI sourceURI = new 
URI(subTaskModel.getSource());
-                               URI destinationURI = new 
URI(subTaskModel.getDestination());
-
-                               ProcessState processState = 
taskContext.getParentProcessContext().getProcessState();
-                               if (processState == 
ProcessState.INPUT_DATA_STAGING) {
-                                       /**
-                                        * copy local file to compute resource.
-                                        */
-                                       
taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(),
 destinationURI
-                                                       .getPath());
-                               } else if (processState == 
ProcessState.OUTPUT_DATA_STAGING) {
-                                       /**
-                                        * copy remote file from compute 
resource.
-                                        */
-                                       
taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(),
 destinationURI
-                                                       .getPath());
-                               }
-                               status.setReason("Successfully staged data");
-                       } catch (SSHApiException e) {
-                               String msg = "Scp attempt failed";
-                               log.error(msg, e);
-                               status.setState(TaskState.FAILED);
-                               status.setReason(msg);
-                               ErrorModel errorModel = new ErrorModel();
-                               
errorModel.setActualErrorMessage(e.getMessage());
-                               errorModel.setUserFriendlyMessage(msg);
-                               
taskContext.getTaskModel().setTaskError(errorModel);
-                       } catch (TException e) {
-                               String msg = "Invalid task invocation";
-                               log.error(msg, e);
-                               status.setState(TaskState.FAILED);
-                               status.setReason(msg);
-                               ErrorModel errorModel = new ErrorModel();
-                               
errorModel.setActualErrorMessage(e.getMessage());
-                               errorModel.setUserFriendlyMessage(msg);
-                               
taskContext.getTaskModel().setTaskError(errorModel);
-                       } catch (URISyntaxException e) {
-                               String msg = "source or destination is not a 
valid URI";
-                               log.error(msg, e);
-                               status.setState(TaskState.FAILED);
-                               status.setReason(msg);
-                               ErrorModel errorModel = new ErrorModel();
-                               
errorModel.setActualErrorMessage(e.getMessage());
-                               errorModel.setUserFriendlyMessage(msg);
-                               
taskContext.getTaskModel().setTaskError(errorModel);
-                       }
-               }
-               return status;
-       }
-
-       @Override
-       public TaskStatus recover(TaskContext taskContext) {
+    private static final Logger log = 
LoggerFactory.getLogger(SCPDataStageTask.class);
+    private static final int DEFAULT_SSH_PORT = 22;
+    private String password;
+    private String publicKeyPath;
+    private String passPhrase;
+    private String privateKeyPath;
+    private String userName;
+    private String hostName;
+    private String inputPath;
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+        inputPath = propertyMap.get("inputPath");
+        hostName = propertyMap.get("hostName");
+        userName = propertyMap.get("userName");
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+        AuthenticationInfo authenticationInfo = null;
+        DataStagingTaskModel subTaskModel = null;
+        String localDataDir = null;
+        ProcessState processState = 
taskContext.getParentProcessContext().getProcessState();
+        try {
+            subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel
+                    (taskContext.getTaskModel());
+            if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                OutputDataObjectType processOutput = 
taskContext.getProcessOutput();
+                if (processOutput != null && processOutput.getValue() == null) 
{
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't 
stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), 
taskContext.getProcessId(), taskContext.getTaskId(),
+                            processOutput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processOutput.isIsRequired()) {
+                        status.setReason("File name is null, but this output's 
isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else if (processState == ProcessState.INPUT_DATA_STAGING) {
+                InputDataObjectType processInput = 
taskContext.getProcessInput();
+                if (processInput != null && processInput.getValue() == null) {
+                    log.error("expId: {}, processId:{}, taskId: {}:- Couldn't 
stage file {} , file name shouldn't be null",
+                            taskContext.getExperimentId(), 
taskContext.getProcessId(), taskContext.getTaskId(),
+                            processInput.getName());
+                    status = new TaskStatus(TaskState.FAILED);
+                    if (processInput.isIsRequired()) {
+                        status.setReason("File name is null, but this input's 
isRequired bit is not set");
+                    } else {
+                        status.setReason("File name is null");
+                    }
+                    return status;
+                }
+            } else {
+                status.setState(TaskState.FAILED);
+                status.setReason("Invalid task invocation, Support " + 
ProcessState.INPUT_DATA_STAGING.name() + " and " +
+                        "" + ProcessState.OUTPUT_DATA_STAGING.name() + " 
process phases. found " + processState.name());
+                return status;
+            }
+
+            // use rsync instead of scp if source and destination host and 
user name is same.
+            URI sourceURI = new URI(subTaskModel.getSource());
+            String fileName = 
sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 
1,
+                    sourceURI.getPath().length());
+            URI destinationURI = null;
+            if (subTaskModel.getDestination().startsWith("dummy")) {
+                destinationURI = getDestinationURI(taskContext, fileName);
+                subTaskModel.setDestination(destinationURI.toString());
+            } else {
+                destinationURI = new URI(subTaskModel.getDestination());
+            }
+
+            if (sourceURI.getHost().equalsIgnoreCase(destinationURI.getHost())
+                    && 
sourceURI.getUserInfo().equalsIgnoreCase(destinationURI.getUserInfo())) {
+                localDataCopy(taskContext, sourceURI, destinationURI);
+                status.setState(TaskState.COMPLETED);
+                status.setReason("Locally copied file using 'cp' command ");
+                return status;
+            }
+
+
+            String tokenId = 
taskContext.getParentProcessContext().getTokenId();
+            CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
+            Credential credential = 
credentialReader.getCredential(taskContext.getParentProcessContext().getGatewayId(),
 tokenId);
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = (SSHCredential) credential;
+                byte[] publicKey = sshCredential.getPublicKey();
+                publicKeyPath = writeFileToDisk(publicKey);
+                byte[] privateKey = sshCredential.getPrivateKey();
+                privateKeyPath = writeFileToDisk(privateKey);
+                passPhrase = sshCredential.getPassphrase();
+//                userName = sshCredential.getPortalUserName(); // this might 
not same as login user name
+                authenticationInfo = getSSHKeyAuthentication();
+            } else {
+                String msg = "Provided credential store token is not valid. 
Please provide the correct credential store token";
+                log.error(msg);
+                status.setState(TaskState.FAILED);
+                status.setReason(msg);
+                ErrorModel errorModel = new ErrorModel();
+                errorModel.setActualErrorMessage(msg);
+                errorModel.setUserFriendlyMessage(msg);
+                taskContext.getTaskModel().setTaskError(errorModel);
+                return status;
+            }
+            status = new TaskStatus(TaskState.COMPLETED);
+
+            ServerInfo serverInfo = new ServerInfo(userName, hostName, 
DEFAULT_SSH_PORT);
+            Session sshSession = Factory.getSSHSession(authenticationInfo, 
serverInfo);
+            if (processState == ProcessState.INPUT_DATA_STAGING) {
+                inputDataStaging(taskContext, sshSession, sourceURI, 
destinationURI);
+                status.setReason("Successfully staged input data");
+            } else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
+                String targetPath = destinationURI.getPath().substring(0, 
destinationURI.getPath().lastIndexOf('/'));
+                SSHUtils.makeDirectory(targetPath, sshSession);
+                // TODO - save updated subtask model with new destination
+                outputDataStaging(taskContext, sshSession, sourceURI, 
destinationURI);
+                status.setReason("Successfully staged output data");
+            }
+        } catch (TException e) {
+            String msg = "Couldn't create subTask model thrift model";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+            return status;
+        } catch (ApplicationSettingsException | FileNotFoundException | 
CredentialStoreException | IllegalAccessException | InstantiationException e) {
+            String msg = "Failed while reading credentials";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (URISyntaxException e) {
+            String msg = "Sorce or destination uri is not correct source : " + 
subTaskModel.getSource() + ", " +
+                    "destination : " + subTaskModel.getDestination();
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (SSHApiException e) {
+            String msg = "Failed to do scp with compute resource";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (AiravataException e) {
+            String msg = "Error while creating ssh session with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (JSchException | IOException e) {
+            String msg = "Failed to do scp with client";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        } catch (GFacException e) {
+            String msg = "Failed update experiment and process inputs and 
outputs";
+            log.error(msg, e);
+            status.setState(TaskState.FAILED);
+            status.setReason(msg);
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setActualErrorMessage(e.getMessage());
+            errorModel.setUserFriendlyMessage(msg);
+            taskContext.getTaskModel().setTaskError(errorModel);
+        }
+        return status;
+    }
+
+    private void localDataCopy(TaskContext taskContext, URI sourceURI, URI 
destinationURI) throws SSHApiException {
+        StringBuilder sb = new StringBuilder("rsync -cr ");
+        sb.append(sourceURI.getPath()).append(" 
").append(destinationURI.getPath());
+        CommandInfo commandInfo = new RawCommandInfo(sb.toString());
+        
taskContext.getParentProcessContext().getRemoteCluster().execute(commandInfo);
+    }
+
+    private void inputDataStaging(TaskContext taskContext, Session sshSession, 
URI sourceURI, URI
+            destinationURI) throws SSHApiException, IOException, JSchException 
{
+        /**
+         * scp third party file transfer 'to' compute resource.
+         */
+        
taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.TO);
+    }
+
+    private void outputDataStaging(TaskContext taskContext, Session 
sshSession, URI sourceURI, URI destinationURI)
+            throws SSHApiException, AiravataException, IOException, 
JSchException, GFacException {
+
+        /**
+         * scp third party file transfer 'from' comute resource.
+         */
+        
taskContext.getParentProcessContext().getRemoteCluster().scpThirdParty(sourceURI.getPath(),
+                destinationURI.getPath(), sshSession, 
RemoteCluster.DIRECTION.FROM);
+        // update output locations
+        GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
+        GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), 
taskContext.getProcessOutput().getName(), destinationURI.getPath());
+
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
         TaskState state = taskContext.getTaskStatus().getState();
         if (state == TaskState.EXECUTING || state == TaskState.CREATED) {
             return execute(taskContext);
@@ -117,10 +297,45 @@ public class SCPDataStageTask implements Task {
             // files already transferred or failed
             return taskContext.getTaskStatus();
         }
-       }
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return TaskTypes.DATA_STAGING;
+    }
+
+    private SSHPasswordAuthentication getSSHPasswordAuthentication() {
+        return new SSHPasswordAuthentication(userName, password);
+    }
+
+    private SSHKeyAuthentication getSSHKeyAuthentication() {
+        SSHKeyAuthentication sshKA = new SSHKeyAuthentication();
+        sshKA.setUserName(userName);
+        sshKA.setPassphrase(passPhrase);
+        sshKA.setPrivateKeyFilePath(privateKeyPath);
+        sshKA.setPublicKeyFilePath(publicKeyPath);
+        sshKA.setStrictHostKeyChecking("no");
+        return sshKA;
+    }
+
+    private String writeFileToDisk(byte[] data) {
+        File temp = null;
+        try {
+            temp = File.createTempFile("id_rsa", "");
+            //write it
+            FileOutputStream bw = new FileOutputStream(temp);
+            bw.write(data);
+            bw.close();
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+        return temp.getAbsolutePath();
+    }
+
+    public URI getDestinationURI(TaskContext taskContext, String fileName) 
throws URISyntaxException {
+        String filePath = (inputPath.endsWith(File.separator) ? inputPath : 
inputPath + File.separator) +
+                taskContext.getParentProcessContext().getProcessId() + 
File.separator + fileName;
+        return new URI("SCP", hostName, filePath, null);
 
-       @Override
-       public TaskTypes getType() {
-               return TaskTypes.DATA_STAGING;
-       }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dc1be312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
deleted file mode 100644
index d28ae3f..0000000
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.cluster.RemoteCluster;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.commons.ErrorModel;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.TaskTypes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class SSHEnvironmentSetupTask implements Task {
-
-       private static final Logger log = 
LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
-       @Override
-       public void init(Map<String, String> propertyMap) throws TaskException {
-
-       }
-
-       @Override
-       public TaskStatus execute(TaskContext taskContext) {
-               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
-               try {
-                       RemoteCluster remoteCluster = 
taskContext.getParentProcessContext().getRemoteCluster();
-                       
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
-                       status.setReason("Successfully created environment");
-               } catch (SSHApiException e) {
-                       String msg = "Error while environment setup";
-                       log.error(msg, e);
-                       status.setState(TaskState.FAILED);
-                       status.setReason(msg);
-                       ErrorModel errorModel = new ErrorModel();
-                       errorModel.setActualErrorMessage(e.getMessage());
-                       errorModel.setUserFriendlyMessage(msg);
-                       taskContext.getTaskModel().setTaskError(errorModel);
-               }
-               return status;
-       }
-
-       @Override
-       public TaskStatus recover(TaskContext taskContext) {
-               return execute(taskContext);
-       }
-
-       @Override
-       public TaskTypes getType() {
-               return TaskTypes.ENV_SETUP;
-       }
-}

Reply via email to