Repository: airavata
Updated Branches:
  refs/heads/master 08cdad264 -> c62f74a5e


Added LocalRemoteCluster implementation and LocalCommandOutput reader


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c62f74a5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c62f74a5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c62f74a5

Branch: refs/heads/master
Commit: c62f74a5ecaa9866f7337a3c25009dcf8ee52f35
Parents: 08cdad2
Author: Shameera Rathnayaka <[email protected]>
Authored: Mon Nov 9 13:14:06 2015 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Mon Nov 9 13:14:06 2015 -0500

----------------------------------------------------------------------
 .../gfac/core/cluster/RemoteCluster.java        |   4 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      |  26 +--
 .../airavata/gfac/impl/HPCRemoteCluster.java    |   6 +-
 .../airavata/gfac/impl/LocalCommandOutput.java  |  60 ++++++
 .../airavata/gfac/impl/LocalRemoteCluster.java  | 182 +++++++++++++++++++
 .../gfac/impl/task/SCPDataStageTask.java        |   4 +-
 6 files changed, 257 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 59e7ff5..9e4544c 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -51,7 +51,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile remote file location, this can be a directory too
         * @throws SSHApiException throws exception during error
         */
-       public void scpTo(String localFile, String remoteFile) throws 
SSHApiException;
+       public void copyTo(String localFile, String remoteFile) throws 
SSHApiException;
 
        /**
         * This will copy a remote file in path rFile to local file lFile
@@ -59,7 +59,7 @@ public interface RemoteCluster { // FIXME: replace 
SSHApiException with suitable
         * @param remoteFile      remote file path, this has to be a full 
qualified path
         * @param localFile This is the local file to copy, this can be a 
directory too
         */
-       public void scpFrom(String remoteFile, String localFile) throws 
SSHApiException;
+       public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException;
 
        /**
         * This wil copy source remote file to target remote file.

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 4b67ffd..6444eb4 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -343,21 +343,7 @@ public class GFacEngineImpl implements GFacEngine {
             taskContext.setTaskStatus(taskStatus);
             GFacUtils.saveAndPublishTaskStatus(taskContext);
 
-            if (taskStatus.getState() == TaskState.FAILED) {
-                log.error("expId: {}, processId: {}, taskId: {} type: {},:- 
Job submission task failed, " +
-                        "reason:" + " {}", 
taskContext.getParentProcessContext().getExperimentId(), taskContext
-                        .getParentProcessContext().getProcessId(), 
taskContext.getTaskId(), jobSubmissionTask.getType
-                        ().name(), taskStatus.getReason());
-                String errorMsg = new StringBuilder("expId: 
").append(processContext.getExperimentId()).append(", processId: ")
-                        .append(processContext.getProcessId()).append(", 
taskId: ").append(taskContext.getTaskId())
-                        .append(", type: 
").append(taskContext.getTaskType().name()).append(" :- Job submission task 
failed. Reason: ")
-                        .append(taskStatus.getReason()).toString();
-                ErrorModel errorModel = new ErrorModel();
-                errorModel.setUserFriendlyMessage("Job submission task 
failed");
-                errorModel.setActualErrorMessage(errorMsg);
-                GFacUtils.saveTaskError(taskContext, errorModel);
-                throw new GFacException("Job submission task failed");
-            }
+            checkFailures(taskContext, taskStatus, jobSubmissionTask);
             return false;
         } catch (TException e) {
             throw new GFacException(e);
@@ -419,13 +405,18 @@ public class GFacEngineImpl implements GFacEngine {
         taskContext.setTaskStatus(taskStatus);
         GFacUtils.saveAndPublishTaskStatus(taskContext);
 
+        checkFailures(taskContext, taskStatus, dMoveTask);
+        return false;
+    }
+
+    private void checkFailures(TaskContext taskContext, TaskStatus taskStatus, 
Task dMoveTask) throws GFacException {
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input 
statging failed, " +
                     "reason:" + " {}", 
taskContext.getParentProcessContext().getExperimentId(), taskContext
                     .getParentProcessContext().getProcessId(), 
taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
-            String errorMsg = new StringBuilder("expId: 
").append(processContext.getExperimentId()).append(", processId: ")
-                    .append(processContext.getProcessId()).append(", taskId: 
").append(taskContext.getTaskId())
+            String errorMsg = new StringBuilder("expId: 
").append(taskContext.getParentProcessContext().getExperimentId()).append(", 
processId: ")
+                    
.append(taskContext.getParentProcessContext().getProcessId()).append(", taskId: 
").append(taskContext.getTaskId())
                     .append(", type: 
").append(taskContext.getTaskType().name()).append(" :- Input staging failed. 
Reason: ")
                     .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
@@ -434,7 +425,6 @@ public class GFacEngineImpl implements GFacEngine {
             GFacUtils.saveTaskError(taskContext, errorModel);
             throw new GFacException("Error while staging input data");
         }
-        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 288c98c..3711f7c 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -83,7 +83,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        @Override
        public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, 
String workingDirectory) throws SSHApiException {
                JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-               scpTo(jobScriptFilePath, workingDirectory); // scp script file 
to working directory
+               copyTo(jobScriptFilePath, workingDirectory); // scp script file 
to working directory
                RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
                submitCommand.setRawCommand("cd " + workingDirectory + "; " + 
submitCommand.getRawCommand());
                StandardOutReader reader = new StandardOutReader();
@@ -97,7 +97,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        }
 
        @Override
-       public void scpTo(String localFile, String remoteFile) throws 
SSHApiException {
+       public void copyTo(String localFile, String remoteFile) throws 
SSHApiException {
                int retry = 3;
                while (retry > 0) {
                        try {
@@ -128,7 +128,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
        }
 
        @Override
-       public void scpFrom(String remoteFile, String localFile) throws 
SSHApiException {
+       public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException {
                int retry = 3;
                while(retry>0) {
                        try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
new file mode 100644
index 0000000..e9d683d
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+
+/**
+ * Created by syodage on 11/9/15.
+ */
+public class LocalCommandOutput implements CommandOutput {
+    private Process process;
+
+    @Override
+    public void onOutput(Channel channel) {
+
+    }
+
+    public void readOutputs(Process process) {
+        this.process = process;
+    }
+
+    public String getStandardOut() throws IOException {
+        BufferedReader stdInput = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+        StringBuilder sb = new StringBuilder();
+        String s = null;
+        while ((s = stdInput.readLine()) != null) {
+            sb.append(s);
+        }
+        return sb.toString();
+    }
+
+    public String getStandardErrorString() throws IOException {
+        BufferedReader stdError = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+        StringBuilder sb = new StringBuilder();
+        String s = null;
+        while ((s = stdError.readLine()) != null) {
+            sb.append(s);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public OutputStream getStandardError() {
+        return null;
+    }
+
+    @Override
+    public void exitCode(int code) {
+
+    }
+
+    @Override
+    public int getExitCode() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
new file mode 100644
index 0000000..d1aa6e0
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -0,0 +1,182 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.*;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.*;
+
+/**
+ * Created by shameera on 11/9/15.
+ */
+public class LocalRemoteCluster extends AbstractRemoteCluster {
+
+    public LocalRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration 
jobManagerConfiguration, AuthenticationInfo authenticationInfo) {
+        super(serverInfo, jobManagerConfiguration, authenticationInfo);
+    }
+
+    @Override
+    public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String 
workingDirectory) throws SSHApiException {
+        try {
+            JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+            copyTo(jobScriptFilePath, workingDirectory); // scp script file to 
working directory
+            RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
+            submitCommand.setRawCommand("cd " + workingDirectory + "; " + 
submitCommand.getRawCommand());
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(submitCommand, localCommandOutput);
+            
jsoutput.setJobId(outputParser.parseJobSubmission(localCommandOutput.getStandardOut()));
+            jsoutput.setExitCode(localCommandOutput.getExitCode());
+            jsoutput.setStdOut(localCommandOutput.getStandardOut());
+            jsoutput.setStdErr(localCommandOutput.getStandardErrorString());
+            return jsoutput;
+        } catch (IOException e) {
+            throw new SSHApiException("Error while submitting local batch 
job", e);
+        }
+    }
+
+    @Override
+    public void copyTo(String localFile, String remoteFile) throws 
SSHApiException {
+        Path sourcePath = Paths.get(localFile);
+        Path targetPath = Paths.get(remoteFile);
+        try {
+            Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while copying sourceFile: " + 
sourcePath.toString()
+                    + ", to destinationFile: " + targetPath.toString(), e);
+        }
+    }
+
+    @Override
+    public void copyFrom(String remoteFile, String localFile) throws 
SSHApiException {
+        Path sourcePath = Paths.get(remoteFile);
+        Path targetPath = Paths.get(localFile);
+        try {
+            Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while copying sourceFile: " + 
sourcePath.toString()
+                    + ", to destinationFile: " + targetPath.toString(), e);
+        }
+    }
+
+    @Override
+    public void scpThirdParty(String sourceFile, String destinationFile, 
Session session, DIRECTION inOrOut) throws SSHApiException {
+        throw new UnsupportedOperationException("Scp third party is not 
support with LocalRemoteCluster");
+    }
+
+    @Override
+    public void makeDirectory(String directoryPath) throws SSHApiException {
+        Path dirPath = Paths.get(directoryPath);
+        Set<PosixFilePermission> perms = new HashSet<>();
+        // add permission as rw-r--r-- 644
+        perms.add(PosixFilePermission.OWNER_WRITE);
+        perms.add(PosixFilePermission.OWNER_READ);
+        perms.add(PosixFilePermission.GROUP_READ);
+        perms.add(PosixFilePermission.OTHERS_READ);
+        FileAttribute<Set<PosixFilePermission>> fileAttributes = 
PosixFilePermissions.asFileAttribute(perms);
+        try {
+            Files.createDirectory(dirPath, fileAttributes);
+        } catch (IOException e) {
+            throw new SSHApiException("Error making directory", e);
+        }
+
+    }
+
+    @Override
+    public JobStatus cancelJob(String jobID) throws SSHApiException {
+        JobStatus oldStatus = getJobStatus(jobID);
+        RawCommandInfo cancelCommand = 
jobManagerConfiguration.getCancelCommand(jobID);
+        execute(cancelCommand);
+        return oldStatus;
+    }
+
+
+    @Override
+    public JobStatus getJobStatus(String jobID) throws SSHApiException {
+        RawCommandInfo monitorCommand = 
jobManagerConfiguration.getMonitorCommand(jobID);
+        LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+        try {
+            executeCommand(monitorCommand, localCommandOutput);
+            return outputParser.parseJobStatus(jobID, 
localCommandOutput.getStandardErrorString());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting jobStatus", e);
+        }
+    }
+
+    @Override
+    public String getJobIdByJobName(String jobName, String userName) throws 
SSHApiException {
+        try {
+            RawCommandInfo jobIdMonitorCommand = 
jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(jobIdMonitorCommand, localCommandOutput);
+            return outputParser.parseJobId(jobName, 
localCommandOutput.getStandardOut());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting jobId using 
JobName", e);
+        }
+    }
+
+    @Override
+    public void getJobStatuses(String userName, Map<String, JobStatus> 
jobStatusMap) throws SSHApiException {
+        try {
+            RawCommandInfo userBasedMonitorCommand = 
jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(userBasedMonitorCommand, localCommandOutput);
+            outputParser.parseJobStatuses(userName, jobStatusMap, 
localCommandOutput.getStandardOut());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting job statuses", e);
+        }
+    }
+
+    @Override
+    public List<String> listDirectory(String directoryPath) throws 
SSHApiException {
+        File directory = new File(directoryPath);
+        List<String> results = new ArrayList<>();
+        File[] files = directory.listFiles();
+        for (File file : files) {
+            results.add(file.getName());
+        }
+        return results;
+    }
+
+    @Override
+    public boolean execute(CommandInfo commandInfo) throws SSHApiException {
+        LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+        try {
+            executeCommand(commandInfo, localCommandOutput);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while executing command " + 
commandInfo.getCommand(), e);
+        }
+        return true;
+    }
+
+    private void executeCommand(CommandInfo commandInfo, LocalCommandOutput 
localCommandOutput) throws IOException {
+        Process process = Runtime.getRuntime().exec(commandInfo.getCommand());
+        localCommandOutput.readOutputs(process);
+    }
+
+    @Override
+    public Session getSession() throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public void disconnect() throws SSHApiException {
+
+    }
+
+    @Override
+    public ServerInfo getServerInfo() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 9ffa6f2..32ee31b 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -66,13 +66,13 @@ public class SCPDataStageTask implements Task {
                                        /**
                                         * copy local file to compute resource.
                                         */
-                                       
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
 destinationURI
+                                       
taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(),
 destinationURI
                                                        .getPath());
                                } else if (processState == 
ProcessState.OUTPUT_DATA_STAGING) {
                                        /**
                                         * copy remote file from compute 
resource.
                                         */
-                                       
taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
 destinationURI
+                                       
taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(),
 destinationURI
                                                        .getPath());
                                }
                                status.setReason("Successfully staged data");

Reply via email to