This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 7350b253564a91e4a6ebcd8df3673c69f3d1ba92
Author: dimuthu <dimuthu.upeks...@gmail.com>
AuthorDate: Thu Mar 1 12:27:31 2018 -0500

    Implementing DataStaging tasks
---
 .../apache/airavata/agents/api/AgentAdaptor.java   |   6 +-
 .../agents/api/StorageResourceAdaptor.java         |   7 +
 .../helix/agent/local/LocalAgentAdaptor.java       |  25 ++-
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  | 128 ++++++++++++++-
 .../agent/storage/StorageResourceAdaptorImpl.java  |  85 ++++++++++
 .../apache/airavata/helix/task/api/TaskHelper.java |   1 +
 .../helix/task/api/support/AdaptorSupport.java     |   5 +-
 .../helix/core/support/AdaptorSupportImpl.java     |   8 +
 .../helix/core/support/TaskHelperImpl.java         |   1 +
 .../helix/impl/participant/GlobalParticipant.java  |   3 +-
 .../airavata/helix/impl/task/DataStagingTask.java  |  90 +++++++++--
 .../helix/impl/task/InputDataStagingTask.java      | 117 ++++++++++++++
 .../helix/impl/task/OutputDataStagingTask.java     | 171 +++++++++++++++++++++
 .../airavata/helix/impl/task/TaskContext.java      |   6 +-
 .../helix/impl/task/TaskOnFailException.java       |  28 ++++
 .../impl/task/submission/GroovyMapBuilder.java     |   3 +-
 .../task/submission/task/JobSubmissionTask.java    |   2 +-
 .../helix/impl/workflow/SimpleWorkflow.java        |  17 +-
 18 files changed, 673 insertions(+), 30 deletions(-)

diff --git 
a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
index 2d295de..2948dc1 100644
--- 
a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
+++ 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -17,7 +17,11 @@ public interface AgentAdaptor {
 
     public void createDirectory(String path) throws AgentException;
 
-    public void copyFile(String sourceFile, String destinationFile) throws 
AgentException;
+    public void copyFileTo(String localFile, String remoteFile) throws 
AgentException;
+
+    public void copyFileFrom(String remoteFile, String localFile) throws 
AgentException;
 
     public List<String> listDirectory(String path) throws AgentException;
+
+    public List<String> getFileNameFromExtension(String fileName, String 
parentPath) throws AgentException;
 }
diff --git 
a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
new file mode 100644
index 0000000..9c5d471
--- /dev/null
+++ 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
@@ -0,0 +1,7 @@
+package org.apache.airavata.agents.api;
+
+public interface StorageResourceAdaptor {
+    public void init(String storageResourceId, String gatewayId, String 
loginUser, String token) throws AgentException;
+    public void uploadFile(String sourceFile, String destFile) throws 
AgentException;
+    public void downloadFile(String sourceFile, String destFile) throws 
AgentException;
+}
diff --git 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
index af507bf..7a56526 100644
--- 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
+++ 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
@@ -3,9 +3,7 @@ package org.apache.airavata.helix.agent.local;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.CommandOutput;
-import org.apache.airavata.agents.api.JobSubmissionOutput;
 
-import java.io.File;
 import java.util.List;
 
 public class LocalAgentAdaptor implements AgentAdaptor {
@@ -13,31 +11,42 @@ public class LocalAgentAdaptor implements AgentAdaptor {
 
 
     public void init(Object agentPams) throws AgentException {
-
+        throw new AgentException("Operation not implemented");
     }
 
     @Override
     public void init(String computeResource, String gatewayId, String userId, 
String token) throws AgentException {
-
+        throw new AgentException("Operation not implemented");
     }
 
     @Override
     public CommandOutput executeCommand(String command, String 
workingDirectory) throws AgentException {
-        return null;
+        throw new AgentException("Operation not implemented");
     }
 
     @Override
     public void createDirectory(String path) throws AgentException {
-
+        throw new AgentException("Operation not implemented");
     }
 
     @Override
-    public void copyFile(String sourceFile, String destinationFile) throws 
AgentException {
+    public void copyFileTo(String localFile, String remoteFile) throws 
AgentException {
+        throw new AgentException("Operation not implemented");
+    }
 
+    @Override
+    public void copyFileFrom(String remoteFile, String localFile) throws 
AgentException {
+        throw new AgentException("Operation not implemented");
     }
 
+
     @Override
     public List<String> listDirectory(String path) throws AgentException {
-        return null;
+        throw new AgentException("Operation not implemented");
+    }
+
+    @Override
+    public List<String> getFileNameFromExtension(String fileName, String 
parentPath) throws AgentException {
+        throw new AgentException("Operation not implemented");
     }
 }
diff --git 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index ef8d580..2ad2415 100644
--- 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -190,7 +190,7 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
-    public void copyFile(String localFile, String remoteFile) throws 
AgentException {
+    public void copyFileTo(String localFile, String remoteFile) throws 
AgentException {
         FileInputStream fis = null;
         String prefix = null;
         if (new File(localFile).isDirectory()) {
@@ -296,6 +296,127 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    // TODO file not found does not return exception
+    public void copyFileFrom(String remoteFile, String localFile) throws 
AgentException {
+        FileOutputStream fos = null;
+        ChannelExec channelExec = null;
+        try {
+            String prefix = null;
+            if (new File(localFile).isDirectory()) {
+                prefix = localFile + File.separator;
+            }
+
+            StandardOutReader stdOutReader = new StandardOutReader();
+
+            // exec 'scp -f remotefile' remotely
+            String command = "scp -f " + remoteFile;
+            channelExec = (ChannelExec)session.openChannel("exec");
+            channelExec.setCommand(command);
+
+            //channelExec.setErrStream(stdOutReader.getStandardError());
+            // get I/O streams for remote scp
+            OutputStream out = channelExec.getOutputStream();
+            InputStream in = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
+
+            if (!channelExec.isClosed()){
+                channelExec.connect();
+            }
+
+            byte[] buf = new byte[1024];
+
+            // send '\0'
+            buf[0] = 0;
+            out.write(buf, 0, 1);
+            out.flush();
+
+            while (true) {
+                int c = checkAck(in);
+                if (c != 'C') {
+                    break;
+                }
+
+                // read '0644 '
+                in.read(buf, 0, 5);
+
+                long filesize = 0L;
+                while (true) {
+                    if (in.read(buf, 0, 1) < 0) {
+                        // error
+                        break;
+                    }
+                    if (buf[0] == ' ') break;
+                    filesize = filesize * 10L + (long) (buf[0] - '0');
+                }
+
+                String file = null;
+                for (int i = 0; ; i++) {
+                    in.read(buf, i, 1);
+                    if (buf[i] == (byte) 0x0a) {
+                        file = new String(buf, 0, i);
+                        break;
+                    }
+                }
+
+                //System.out.println("filesize="+filesize+", file="+file);
+
+                // send '\0'
+                buf[0] = 0;
+                out.write(buf, 0, 1);
+                out.flush();
+
+                // read a content of lfile
+                fos = new FileOutputStream(prefix == null ? localFile : prefix 
+ file);
+                int foo;
+                while (true) {
+                    if (buf.length < filesize) foo = buf.length;
+                    else foo = (int) filesize;
+                    foo = in.read(buf, 0, foo);
+                    if (foo < 0) {
+                        // error
+                        break;
+                    }
+                    fos.write(buf, 0, foo);
+                    filesize -= foo;
+                    if (filesize == 0L) break;
+                }
+                fos.close();
+                fos = null;
+
+                if (checkAck(in) != 0) {
+                    String error = "Error transfering the file content";
+                    //log.error(error);
+                    throw new AgentException(error);
+                }
+
+                // send '\0'
+                buf[0] = 0;
+                out.write(buf, 0, 1);
+                out.flush();
+            }
+
+
+            stdOutReader.readStdErrFromStream(err);
+            if (stdOutReader.getStdError().contains("scp:")) {
+                throw new AgentException(stdOutReader.getStdError());
+            }
+
+        } catch (Exception e) {
+            //log.error(e.getMessage(), e);
+            throw new AgentException(e);
+        } finally {
+            try {
+                if (fos != null) fos.close();
+            } catch (Exception ee) {
+            }
+
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
+
+        }
+    }
+
     @Override
     public List<String> listDirectory(String path) throws AgentException {
         String command = "ls " + path;
@@ -333,6 +454,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    @Override
+    public List<String> getFileNameFromExtension(String fileName, String 
parentPath) throws AgentException {
+        throw new AgentException("Operation not implemented");
+    }
+
     private static class DefaultUserInfo implements UserInfo, 
UIKeyboardInteractive {
 
         private String userName;
diff --git 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
new file mode 100644
index 0000000..537f17d
--- /dev/null
+++ 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
@@ -0,0 +1,85 @@
+package org.apache.airavata.helix.agent.storage;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
+import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
+import org.apache.airavata.helix.agent.ssh.SshAdaptorParams;
+import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
+import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import 
org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements 
StorageResourceAdaptor  {
+
+    private static final Logger logger = 
LogManager.getLogger(SshAgentAdaptor.class);
+
+    private Session session = null;
+    private AppCatalog appCatalog;
+
+    @Override
+    public void init(String storageResourceId, String gatewayId, String 
loginUser, String token) throws AgentException {
+
+        try {
+            logger.info("Initializing Storage Resource Adaptor for storage 
resource : "+ storageResourceId + ", gateway : " +
+                    gatewayId +", user " + loginUser + ", token : " + token);
+            this.appCatalog = RegistryFactory.getAppCatalog();
+            StorageResourceDescription storageResource = 
appCatalog.getStorageResource().getStorageResource(storageResourceId);
+            String hostName = storageResource.getHostName();
+
+            String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+            String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+            String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+            String driver = ServerSettings.getCredentialStoreDBDriver();
+            CredentialReaderImpl credentialReader = new 
CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass, driver));
+
+            logger.info("Fetching credentials for cred store token " + token);
+
+            Credential credential = credentialReader.getCredential(gatewayId, 
token);
+
+            if (credential instanceof SSHCredential) {
+                SSHCredential sshCredential = 
SSHCredential.class.cast(credential);
+                SshAdaptorParams adaptorParams = new SshAdaptorParams();
+                adaptorParams.setHostName(storageResource.getHostName());
+                adaptorParams.setUserName(loginUser);
+                adaptorParams.setPassphrase(sshCredential.getPassphrase());
+                adaptorParams.setPrivateKey(sshCredential.getPrivateKey());
+                adaptorParams.setPublicKey(sshCredential.getPublicKey());
+                adaptorParams.setStrictHostKeyChecking(false);
+                init(adaptorParams);
+            }
+
+        } catch (AppCatalogException e) {
+            e.printStackTrace();
+        } catch (InstantiationException e) {
+            e.printStackTrace();
+        } catch (IllegalAccessException e) {
+            e.printStackTrace();
+        } catch (CredentialStoreException e) {
+            e.printStackTrace();
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void uploadFile(String sourceFile, String destFile) throws 
AgentException {
+        super.copyFileTo(sourceFile, destFile);
+    }
+
+    @Override
+    public void downloadFile(String sourceFile, String destFile) throws 
AgentException {
+        super.copyFileFrom(sourceFile, destFile);
+    }
+}
diff --git 
a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
 
b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
index 07de06e..4550a66 100644
--- 
a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
+++ 
b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/TaskHelper.java
@@ -1,5 +1,6 @@
 package org.apache.airavata.helix.task.api;
 
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 
 /**
diff --git 
a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
 
b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
index 4b6e11e..456fdee 100644
--- 
a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
+++ 
b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
@@ -1,8 +1,6 @@
 package org.apache.airavata.helix.task.api.support;
 
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.CommandOutput;
-import org.apache.airavata.agents.api.JobSubmissionOutput;
+import org.apache.airavata.agents.api.*;
 
 import java.io.File;
 
@@ -16,5 +14,6 @@ public interface AdaptorSupport {
     public void initializeAdaptor();
 
     public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, 
String protocol, String authToken, String userId) throws Exception;
+    public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String 
storageResourceId, String protocol,  String authToken, String userId) throws 
AgentException;
 
 }
diff --git 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
index a98b8f0..c264012 100644
--- 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
+++ 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -2,6 +2,7 @@ package org.apache.airavata.helix.core.support;
 
 import org.apache.airavata.agents.api.*;
 import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
+import org.apache.airavata.helix.agent.storage.StorageResourceAdaptorImpl;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 
 import java.io.File;
@@ -35,4 +36,11 @@ public class AdaptorSupportImpl implements AdaptorSupport {
         agentAdaptor.init(computeResource, gatewayId, userId, authToken);
         return agentAdaptor;
     }
+
+    @Override
+    public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String 
storageResourceId, String protocol, String authToken, String userId) throws 
AgentException {
+        StorageResourceAdaptor storageResourceAdaptor = new 
StorageResourceAdaptorImpl();
+        storageResourceAdaptor.init(storageResourceId, gatewayId, userId, 
authToken);
+        return storageResourceAdaptor;
+    }
 }
diff --git 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
index 77fc5ce..2987ebd 100644
--- 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
+++ 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/TaskHelperImpl.java
@@ -1,5 +1,6 @@
 package org.apache.airavata.helix.core.support;
 
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import org.apache.airavata.helix.task.api.TaskHelper;
 
 /**
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index f0e166b..984b277 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -16,7 +16,8 @@ public class GlobalParticipant extends HelixParticipant {
 
     private String[] taskClasses = {
         "org.apache.airavata.helix.impl.task.EnvSetupTask",
-        "org.apache.airavata.helix.impl.task.DataStagingTask",
+        "org.apache.airavata.helix.impl.task.InputDataStagingTask",
+        "org.apache.airavata.helix.impl.task.OutputDataStagingTask",
         
"org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask",
         
"org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask",
         
"org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask"
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
index 346aa73..594cbc9 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/DataStagingTask.java
@@ -1,19 +1,89 @@
 package org.apache.airavata.helix.impl.task;
 
-import org.apache.airavata.helix.task.api.TaskHelper;
-import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.helix.task.TaskResult;
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
 
-@TaskDef(name = "Data Staging Task")
-public class DataStagingTask extends AiravataTask {
+import java.io.File;
+import java.io.IOException;
 
-    @Override
-    public TaskResult onRun(TaskHelper taskHelper) {
-        return null;
+public abstract class DataStagingTask extends AiravataTask {
+
+    protected DataStagingTaskModel getDataStagingTaskModel() throws 
TaskOnFailException {
+        try {
+            Object subTaskModel = getTaskContext().getSubTaskModel();
+            if (subTaskModel != null) {
+                return DataStagingTaskModel.class.cast(subTaskModel);
+            } else {
+                throw new TaskOnFailException("Data staging task model can not 
be null for task " + getTaskId(), true, null);
+            }
+        } catch (Exception e) {
+            throw new TaskOnFailException("Failed while obtaining data staging 
task model for task " + getTaskId(), true, e);
+        }
+    }
+
+    protected StorageResourceDescription getStorageResource() throws 
TaskOnFailException {
+        try {
+            StorageResourceDescription storageResource = 
getTaskContext().getStorageResource();
+            if (storageResource == null) {
+                throw new TaskOnFailException("Storage resource can not be 
null for task " + getTaskId(), true, null);
+            }
+            return storageResource;
+        } catch (AppCatalogException e) {
+            throw new TaskOnFailException("Failed to fetch the storage 
resource for task " + getTaskId(), true, e);
+        }
     }
 
-    @Override
-    public void onCancel() {
+    protected StorageResourceAdaptor getStorageAdaptor(AdaptorSupport 
adaptorSupport) throws TaskOnFailException {
+        try {
+            StorageResourceAdaptor storageResourceAdaptor = 
adaptorSupport.fetchStorageAdaptor(
+                    getGatewayId(),
+                    getTaskContext().getStorageResourceId(),
+                    "SSH",
+                    getTaskContext().getStorageResourceCredentialToken(),
+                    getTaskContext().getStorageResourceLoginUserName());
+
+            if (storageResourceAdaptor == null) {
+                throw new TaskOnFailException("Storage resource adaptor for " 
+ getTaskContext().getStorageResourceId() + " can not be null", true, null);
+            }
+            return storageResourceAdaptor;
+        } catch (AgentException e) {
+            throw new TaskOnFailException("Failed to obtain adaptor for 
storage resource " + getTaskContext().getStorageResourceId() +
+                    " in task " + getTaskId(), true, e);
+        }
+    }
+
+    protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport 
adaptorSupport) throws TaskOnFailException {
+        try {
+            return adaptorSupport.fetchAdaptor(
+                    getTaskContext().getGatewayId(),
+                    getTaskContext().getComputeResourceId(),
+                    getTaskContext().getJobSubmissionProtocol().name(),
+                    getTaskContext().getComputeResourceCredentialToken(),
+                    getTaskContext().getComputeResourceLoginUserName());
+        } catch (Exception e) {
+            throw new TaskOnFailException("Failed to obtain adaptor for 
compute resource " + getTaskContext().getComputeResourceId() +
+                    " in task " + getTaskId(), true, e);
+        }
+    }
 
+    protected String getLocalDataPath(String fileName) throws 
TaskOnFailException {
+        String localDataPath = ServerSettings.getLocalDataLocation();
+        localDataPath = (localDataPath.endsWith(File.separator) ? 
localDataPath : localDataPath + File.separator);
+        localDataPath = (localDataPath.endsWith(File.separator) ? 
localDataPath : localDataPath + File.separator) +
+                getProcessId() + File.separator + "temp_inputs" + 
File.separator;
+        try {
+            FileUtils.forceMkdir(new File(localDataPath));
+        } catch (IOException e) {
+            throw new TaskOnFailException("Failed build directories " + 
localDataPath, true, e);
+        }
+        localDataPath = localDataPath + fileName;
+        return localDataPath;
     }
 }
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
new file mode 100644
index 0000000..30eeec0
--- /dev/null
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java
@@ -0,0 +1,117 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+@TaskDef(name = "Input Data Staging Task")
+public class InputDataStagingTask extends DataStagingTask {
+
+    private static final Logger logger = 
LogManager.getLogger(InputDataStagingTask.class);
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+        logger.info("Starting Input Data Staging Task " + getTaskId());
+
+        try {
+            // Get and validate data staging task model
+            DataStagingTaskModel dataStagingTaskModel = 
getDataStagingTaskModel();
+
+            // Fetch and validate input data type from data staging task model
+            InputDataObjectType processInput = 
dataStagingTaskModel.getProcessInput();
+            if (processInput != null && processInput.getValue() == null) {
+                String message = "expId: " + getExperimentId() + ", processId: 
" + getProcessId() + ", taskId: " + getTaskId() +
+                        ":- Couldn't stage file " + processInput.getName() + " 
, file name shouldn't be null. ";
+                logger.error(message);
+                if (processInput.isIsRequired()) {
+                    message += "File name is null, but this input's isRequired 
bit is not set";
+                } else {
+                    message += "File name is null";
+                }
+                logger.error(message);
+                throw new TaskOnFailException(message, true, null);
+            }
+
+            // Fetch and validate storage resource
+            StorageResourceDescription storageResource = getStorageResource();
+
+            // Fetch and validate source and destination URLS
+            URI sourceURI;
+            URI destinationURI;
+            String sourceFileName;
+            try {
+                sourceURI = new URI(dataStagingTaskModel.getSource());
+                destinationURI = new 
URI(dataStagingTaskModel.getDestination());
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Source file " + sourceURI.getPath() + ", 
destination uri " + destinationURI.getPath() + " for task " + getTaskId());
+                }
+
+                sourceFileName = 
sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 
1,
+                        sourceURI.getPath().length());
+            } catch (URISyntaxException e) {
+                throw new TaskOnFailException("Failed to obtain source URI for 
input data staging task " + getTaskId(), true, e);
+            }
+
+            // Fetch and validate storage adaptor
+            StorageResourceAdaptor storageResourceAdaptor = 
getStorageAdaptor(taskHelper.getAdaptorSupport());
+
+            // Fetch and validate compute resource adaptor
+            AgentAdaptor adaptor = 
getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+            String localSourceFilePath = getLocalDataPath(sourceFileName);
+            // Downloading input file from the storage resource
+            try {
+                logger.info("Downloading input file " + sourceURI.getPath() + 
" to the local path " + localSourceFilePath);
+                storageResourceAdaptor.downloadFile(sourceURI.getPath(), 
localSourceFilePath);
+                logger.info("Input file downloaded to " + localSourceFilePath);
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed downloading input file " 
+ sourceFileName + " to the local path " + localSourceFilePath, true, e);
+            }
+
+            // Uploading input file to the compute resource
+            try {
+                logger.info("Uploading the input file to " + 
destinationURI.getPath() + " from local path " + localSourceFilePath);
+                adaptor.copyFileTo(localSourceFilePath, 
destinationURI.getPath());
+                logger.info("Output file uploaded to " + 
destinationURI.getPath());
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed uploading the input file 
to " + destinationURI.getPath() + " from local path " + localSourceFilePath, 
true, e);
+            }
+
+            return onSuccess("Input data staging task " + getTaskId() + " 
successfully completed");
+
+        } catch (TaskOnFailException e) {
+            if (e.getError() != null) {
+                logger.error(e.getReason(), e.getError());
+            } else {
+                logger.error(e.getReason());
+            }
+            return onFail(e.getReason(), e.isCritical(), e.getError());
+
+        }catch (Exception e) {
+            logger.error("Unknown error while executing input data staging 
task " + getTaskId(), e);
+            return onFail("Unknown error while executing input data staging 
task " + getTaskId(), false,  e);
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
new file mode 100644
index 0000000..d2280d0
--- /dev/null
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java
@@ -0,0 +1,171 @@
+package org.apache.airavata.helix.impl.task;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+
+@TaskDef(name = "Output Data Staging Task")
+public class OutputDataStagingTask extends DataStagingTask {
+
+    private static final Logger logger = 
LogManager.getLogger(OutputDataStagingTask.class);
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper) {
+
+        try {
+            // Get and validate data staging task model
+            DataStagingTaskModel dataStagingTaskModel = 
getDataStagingTaskModel();
+
+            // Fetch and validate input data type from data staging task model
+            OutputDataObjectType processOutput = 
dataStagingTaskModel.getProcessOutput();
+            if (processOutput != null && processOutput.getValue() == null) {
+                String message = "expId: " + getExperimentId() + ", processId: 
" + getProcessId() + ", taskId: " + getTaskId() +
+                        ":- Couldn't stage file " + processOutput.getName() + 
" , file name shouldn't be null. ";
+                logger.error(message);
+                if (processOutput.isIsRequired()) {
+                    message += "File name is null, but this output's 
isRequired bit is not set";
+                } else {
+                    message += "File name is null";
+                }
+                throw new TaskOnFailException(message, true, null);
+            }
+
+            // Fetch and validate storage resource
+            // Fetch and validate storage resource
+            StorageResourceDescription storageResource = getStorageResource();
+
+            // Fetch and validate source and destination URLS
+            URI sourceURI;
+            URI destinationURI;
+            String sourceFileName;
+            try {
+                sourceURI = new URI(dataStagingTaskModel.getSource());
+                destinationURI = new 
URI(dataStagingTaskModel.getDestination());
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Source file " + sourceURI.getPath() + ", 
destination uri " + destinationURI.getPath() + " for task " + getTaskId());
+                }
+
+                sourceFileName = 
sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 
1,
+                        sourceURI.getPath().length());
+            } catch (URISyntaxException e) {
+                throw new TaskOnFailException("Failed to obtain source URI for 
output data staging task " + getTaskId(), true, e);
+            }
+
+            // Fetch and validate storage adaptor
+            StorageResourceAdaptor storageResourceAdaptor = 
getStorageAdaptor(taskHelper.getAdaptorSupport());
+
+            // Fetch and validate compute resource adaptor
+            AgentAdaptor adaptor = 
getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+            if (sourceFileName.contains("*")) {
+                // if file is declared as a wild card
+                logger.info("Handling output files with " + sourceFileName + " 
extension for task " + getTaskId());
+
+                String destParentPath = (new 
File(destinationURI.getPath())).getParentFile().getPath();
+                String sourceParentPath = (new 
File(sourceURI.getPath())).getParentFile().getPath();
+
+                logger.debug("Destination parent path " + destParentPath + ", 
source parent path " + sourceParentPath);
+                List<String> fileNames = null;
+                try {
+                    fileNames = 
adaptor.getFileNameFromExtension(sourceFileName, sourceParentPath);
+
+                    if (logger.isTraceEnabled()) {
+                        fileNames.forEach(fileName -> logger.trace("File found 
: " + fileName));
+                    }
+
+                } catch (AgentException e) {
+                    throw new TaskOnFailException("Failed to fetch the file 
list from extension " + sourceFileName, true, e);
+                }
+
+                for (String temp : fileNames) {
+                    if (temp != null && !temp.equals("")) {
+                        sourceFileName = temp;
+                    }
+                    if (destParentPath.endsWith(File.separator)) {
+                        destinationURI = new URI(destParentPath + 
sourceFileName);
+                    } else {
+                        destinationURI = new URI(destParentPath + 
File.separator + sourceFileName);
+                    }
+
+                    //Wildcard support is only enabled for output data staging
+                    processOutput.setName(sourceFileName);
+
+                    try {
+                        
getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT,
 Arrays.asList(processOutput), getExperimentId());
+                        
getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT, 
Arrays.asList(processOutput), getProcessId());
+                    } catch (RegistryException e) {
+                        throw new TaskOnFailException("Failed to update 
experiment or process outputs for task " + getTaskId(), true, e);
+                    }
+
+                    logger.info("Transferring file " + sourceFileName);
+                    transferFile(sourceURI, destinationURI, sourceFileName, 
adaptor, storageResourceAdaptor);
+                }
+
+            } else {
+                // Downloading input file from the storage resource
+                transferFile(sourceURI, destinationURI, sourceFileName, 
adaptor, storageResourceAdaptor);
+                return onSuccess("Input data staging task " + getTaskId() + " 
successfully completed");
+            }
+
+        } catch (TaskOnFailException e) {
+            if (e.getError() != null) {
+                logger.error(e.getReason(), e.getError());
+            } else {
+                logger.error(e.getReason());
+            }
+            return onFail(e.getReason(), e.isCritical(), e.getError());
+
+        } catch (Exception e) {
+            logger.error("Unknown error while executing output data staging 
task " + getTaskId(), e);
+            return onFail("Unknown error while executing output data staging 
task " + getTaskId(), false,  e);
+        }
+
+        return null;
+    }
+
+    private void transferFile(URI sourceURI, URI destinationURI, String 
fileName, AgentAdaptor adaptor,
+                              StorageResourceAdaptor storageResourceAdaptor) 
throws TaskOnFailException {
+        String localSourceFilePath = getLocalDataPath(fileName);
+
+        try {
+            logger.info("Downloading output file " + sourceURI.getPath() + " 
to the local path " + localSourceFilePath);
+            adaptor.copyFileFrom(sourceURI.getPath(), localSourceFilePath);
+            logger.info("Output file downloaded to " + localSourceFilePath);
+        } catch (AgentException e) {
+            throw new TaskOnFailException("Failed downloading output file " + 
sourceURI.getPath() + " to the local path " +
+                    localSourceFilePath, true, e);
+        }
+
+        // Uploading input file to the compute resource
+        try {
+            logger.info("Uploading the output file to " + 
destinationURI.getPath() + " from local path " + localSourceFilePath);
+            storageResourceAdaptor.uploadFile(localSourceFilePath, 
destinationURI.getPath());
+            logger.info("Output file uploaded to " + destinationURI.getPath());
+        } catch (AgentException e) {
+            throw new TaskOnFailException("Failed uploading the output file to 
" + destinationURI.getPath() + " from local path " +
+                    localSourceFilePath, true, e);
+        }
+    }
+
+    @Override
+    public void onCancel() {
+
+    }
+}
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index f33d8a1..64a7de8 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -511,7 +511,11 @@ public class TaskContext {
         this.currentExecutingTaskModel = currentExecutingTaskModel;
     }
 
-    public StorageResourceDescription getStorageResource() {
+    public StorageResourceDescription getStorageResource() throws 
AppCatalogException {
+        if (storageResource == null) {
+            this.storageResource = appCatalog.getStorageResource()
+                    .getStorageResource(processModel.getStorageResourceId());
+        }
         return storageResource;
     }
 
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java
new file mode 100644
index 0000000..196a219
--- /dev/null
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskOnFailException.java
@@ -0,0 +1,28 @@
+package org.apache.airavata.helix.impl.task;
+
+public class TaskOnFailException extends Exception {
+
+
+    private String reason;
+    private boolean critical;
+    private Throwable e;
+
+    public TaskOnFailException(String reason, boolean critical, Throwable e) {
+        super(reason, e);
+        this.reason = reason;
+        this.critical = critical;
+        this.e = e;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+    public boolean isCritical() {
+        return critical;
+    }
+
+    public Throwable getError() {
+        return e;
+    }
+}
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
index 16e8114..e4267ce 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -67,8 +67,7 @@ public class GroovyMapBuilder {
         
inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(),
 false));
         mapData.setInputsAll(inputValuesAll);
 
-        //mapData.setUserName(taskContext.geJo)
-
+        mapData.setUserName(taskContext.getComputeResourceLoginUserName());
         mapData.setShellName("/bin/bash");
 
         if (taskContext != null) {
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index 1a024a7..b517af1 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -60,7 +60,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
 
         logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " 
to remote path " + workingDirectory +
                 " of compute resource " + 
getTaskContext().getComputeResourceId());
-        agentAdaptor.copyFile(tempJobFile.getAbsolutePath(), workingDirectory);
+        agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(), 
workingDirectory);
         // TODO transfer file
         RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, 
tempJobFile.getPath());
 
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 99db2c4..63921db 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -4,6 +4,8 @@ import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.EnvSetupTask;
+import org.apache.airavata.helix.impl.task.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.OutputDataStagingTask;
 import 
org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask;
 import org.apache.airavata.helix.workflow.WorkflowManager;
 import org.apache.airavata.model.experiment.ExperimentModel;
@@ -37,14 +39,25 @@ public class SimpleWorkflow {
         String[] taskIds = taskDag.split(",");
         final List<AiravataTask> allTasks = new ArrayList<>();
 
+        boolean jobSubmissionFound = false;
+
         for (String taskId : taskIds) {
             Optional<TaskModel> model = taskList.stream().filter(taskModel -> 
taskModel.getTaskId().equals(taskId)).findFirst();
-            model.ifPresent(taskModel -> {
+
+            if (model.isPresent()) {
+                TaskModel taskModel = model.get();
                 AiravataTask airavataTask = null;
                 if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) {
                     airavataTask = new EnvSetupTask();
                 } else if (taskModel.getTaskType() == 
TaskTypes.JOB_SUBMISSION) {
                     airavataTask = new DefaultJobSubmissionTask();
+                    jobSubmissionFound = true;
+                } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
+                    if (jobSubmissionFound) {
+                        airavataTask = new OutputDataStagingTask();
+                    } else {
+                        airavataTask = new InputDataStagingTask();
+                    }
                 }
 
                 if (airavataTask != null) {
@@ -57,7 +70,7 @@ public class SimpleWorkflow {
                     }
                     allTasks.add(airavataTask);
                 }
-            });
+            }
         }
 
 /*        DefaultJobSubmissionTask defaultJobSubmissionTask = new 
DefaultJobSubmissionTask();

-- 
To stop receiving notification emails like this one, please contact
dimuthu...@apache.org.

Reply via email to