This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 573dbab1a29f1bf2f1fdb8c9cacdb7ad42b105ad
Author: dimuthu <dimuthu.upeks...@gmail.com>
AuthorDate: Fri Mar 2 13:16:49 2018 -0500

    Fixing bugs in pre workflow
---
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  |  4 +-
 .../apache/airavata/helix/core/AbstractTask.java   | 10 ++++
 .../airavata/helix/workflow/WorkflowManager.java   |  2 +-
 .../airavata/helix/impl/task/EnvSetupTask.java     |  2 +-
 .../airavata/helix/impl/task/TaskContext.java      | 68 +++++++++++++++++++++-
 .../impl/task/submission/GroovyMapBuilder.java     |  4 +-
 .../submission/task/DefaultJobSubmissionTask.java  |  6 +-
 .../task/submission/task/JobSubmissionTask.java    |  4 +-
 .../helix/impl/workflow/SimpleWorkflow.java        |  5 +-
 .../src/main/resources/application.properties      |  2 +-
 10 files changed, 92 insertions(+), 15 deletions(-)

diff --git 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 2ad2415..5392ab5 100644
--- 
a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ 
b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -132,13 +132,12 @@ public class SshAgentAdaptor implements AgentAdaptor {
         ChannelExec channelExec = null;
         try {
             channelExec = ((ChannelExec) session.openChannel("exec"));
-            channelExec.setCommand(command);
+            channelExec.setCommand("cd " + workingDirectory + "; " + command);
             channelExec.setInputStream(null);
             InputStream out = channelExec.getInputStream();
             InputStream err = channelExec.getErrStream();
             channelExec.connect();
 
-            commandOutput.setExitCode(channelExec.getExitStatus());
             commandOutput.readStdOutFromStream(out);
             commandOutput.readStdErrFromStream(err);
             return commandOutput;
@@ -150,6 +149,7 @@ public class SshAgentAdaptor implements AgentAdaptor {
             throw new AgentException(e);
         } finally {
             if (channelExec != null) {
+                commandOutput.setExitCode(channelExec.getExitStatus());
                 channelExec.disconnect();
             }
         }
diff --git 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index 04fa37f..5aca9cd 100644
--- 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -26,6 +26,8 @@ public abstract class AbstractTask extends UserContentStore 
implements Task {
     private TaskCallbackContext callbackContext;
     private TaskHelper taskHelper;
 
+    private int retryCount = 3;
+
     @Override
     public void init(HelixManager manager, String workflowName, String 
jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
@@ -105,4 +107,12 @@ public abstract class AbstractTask extends 
UserContentStore implements Task {
         this.taskHelper = taskHelper;
         return this;
     }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
 }
diff --git 
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
 
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index ab7e3c4..9ecafb9 100644
--- 
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++ 
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -61,7 +61,7 @@ public class WorkflowManager {
             JobConfig.Builder job = new JobConfig.Builder()
                     .addTaskConfigs(taskBuilds)
                     .setFailureThreshold(0)
-                    .setMaxAttemptsPerTask(3);
+                    .setMaxAttemptsPerTask(data.getRetryCount());
 
             if (!globalParticipant) {
                 job.setInstanceGroupTag(taskType);
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index eafa53d..ddba5f2 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -28,7 +28,7 @@ public class EnvSetupTask extends AiravataTask {
             logger.info("Creating directory " + 
getTaskContext().getWorkingDir() + " on compute resource " + 
getTaskContext().getComputeResourceId());
             adaptor.createDirectory(getTaskContext().getWorkingDir());
             publishTaskState(TaskState.COMPLETED);
-            return onSuccess("Successfully completed");
+            return onSuccess("Envi setup task successfully completed " + 
getTaskId());
         } catch (Exception e) {
             try {
                 publishTaskState(TaskState.FAILED);
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 64a7de8..489a196 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -13,6 +13,8 @@ import 
org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescr
 import 
org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
 import 
org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
 import 
org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
@@ -23,11 +25,13 @@ import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.*;
 
 public class TaskContext {
@@ -436,8 +440,38 @@ public class TaskContext {
         this.resourceJobManager = resourceJobManager;
     }
 
-    public ResourceJobManager getResourceJobManager() {
-        return resourceJobManager;
+    public ResourceJobManager getResourceJobManager() throws Exception {
+
+        if (this.resourceJobManager == null) {
+            JobSubmissionInterface jsInterface = 
getPreferredJobSubmissionInterface();
+
+            if (jsInterface == null) {
+                throw new Exception("Job Submission interface cannot be empty 
at this point");
+            } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH) {
+                SSHJobSubmission sshJobSubmission = 
getAppCatalog().getComputeResource().getSSHJobSubmission
+                        (jsInterface.getJobSubmissionInterfaceId());
+                // context method.
+                resourceJobManager = sshJobSubmission.getResourceJobManager();
+            } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.LOCAL) {
+                LOCALSubmission localSubmission = 
getAppCatalog().getComputeResource().getLocalJobSubmission
+                        (jsInterface.getJobSubmissionInterfaceId());
+                resourceJobManager = localSubmission.getResourceJobManager();
+            } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH_FORK) {
+                SSHJobSubmission sshJobSubmission = 
getAppCatalog().getComputeResource().getSSHJobSubmission
+                        (jsInterface.getJobSubmissionInterfaceId());
+                resourceJobManager = sshJobSubmission.getResourceJobManager();
+            } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.CLOUD) {
+                return null;
+            } else {
+                throw new Exception("Unsupported JobSubmissionProtocol - " + 
jsInterface.getJobSubmissionProtocol()
+                        .name());
+            }
+
+            if (resourceJobManager == null) {
+                throw new Exception("Resource Job Manager is empty.");
+            }
+        }
+        return this.resourceJobManager;
     }
 
     public String getLocalWorkingDir() {
@@ -794,6 +828,36 @@ public class TaskContext {
                     
.getApplicationInterface(processModel.getApplicationInterfaceId()));
             
ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
                     (ctx.getComputeResourceId()));
+
+            List<OutputDataObjectType> applicationOutputs = 
ctx.getApplicationInterfaceDescription().getApplicationOutputs();
+            if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
+                for (OutputDataObjectType outputDataObjectType : 
applicationOutputs) {
+                    if 
(outputDataObjectType.getType().equals(DataType.STDOUT)) {
+                        if (outputDataObjectType.getValue() == null || 
outputDataObjectType.getValue().equals("")) {
+                            String stdOut = 
(ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : 
ctx.getWorkingDir() + File.separator)
+                                    + 
ctx.getApplicationInterfaceDescription().getApplicationName() + ".stdout";
+                            outputDataObjectType.setValue(stdOut);
+                            ctx.setStdoutLocation(stdOut);
+                        } else {
+                            
ctx.setStdoutLocation(outputDataObjectType.getValue());
+                        }
+                    }
+                    if 
(outputDataObjectType.getType().equals(DataType.STDERR)) {
+                        if (outputDataObjectType.getValue() == null || 
outputDataObjectType.getValue().equals("")) {
+                            String stderrLocation = 
(ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : 
ctx.getWorkingDir() + File.separator)
+                                    + 
ctx.getApplicationInterfaceDescription().getApplicationName() + ".stderr";
+                            outputDataObjectType.setValue(stderrLocation);
+                            ctx.setStderrLocation(stderrLocation);
+                        } else {
+                            
ctx.setStderrLocation(outputDataObjectType.getValue());
+                        }
+                    }
+                }
+            }
+
+            // TODO move this to some where else as this is not the correct 
place to do so
+            experimentCatalog.update(ExperimentCatalogModelType.PROCESS, 
processModel, processId);
+            processModel.setProcessOutputs(applicationOutputs);
             return ctx;
         }
 
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
index e4267ce..2119755 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -64,7 +64,7 @@ public class GroovyMapBuilder {
         mapData.setInputs(inputValues);
 
         List<String> inputValuesAll = 
getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false);
-        
inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(),
 false));
+        
inputValuesAll.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(),
 false));
         mapData.setInputsAll(inputValuesAll);
 
         mapData.setUserName(taskContext.getComputeResourceLoginUserName());
@@ -103,7 +103,7 @@ public class GroovyMapBuilder {
                 mapData.setQueueName(scheduling.getQueueName());
             }
             if (totalNodeCount > 0) {
-                mapData.setNodes(totalCPUCount);
+                mapData.setNodes(totalNodeCount);
             }
             if (totalCPUCount > 0) {
                 int ppn = totalCPUCount / totalNodeCount;
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index c85e18b..e21f200 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -46,6 +46,7 @@ public class DefaultJobSubmissionTask extends 
JobSubmissionTask {
             
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
             jobModel.setTaskId(getTaskId());
             jobModel.setJobName(mapData.getJobName());
+            jobModel.setJobDescription("Sample description");
 
             if (mapData != null) {
                 
//jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
@@ -71,10 +72,11 @@ public class DefaultJobSubmissionTask extends 
JobSubmissionTask {
                         statusList.add(new JobStatus(JobState.FAILED));
                         
statusList.get(0).setReason(submissionOutput.getFailureReason());
                         jobModel.setJobStatuses(statusList);
-                        jobModel.setJobDescription("Sample description");
                         saveJobModel(jobModel);
                         logger.error("expId: " + getExperimentId() + ", 
processid: " + getProcessId()+ ", taskId: " +
-                                getTaskId() + " :- Job submission failed for 
job name " + jobModel.getJobName());
+                                getTaskId() + " :- Job submission failed for 
job name " + jobModel.getJobName()
+                                + ". Exit code : " + 
submissionOutput.getExitCode() + ", Submission failed : "
+                                + submissionOutput.isJobSubmissionFailed());
 
                         ErrorModel errorModel = new ErrorModel();
                         
errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason());
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index b517af1..ac314e9 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -181,9 +181,9 @@ public abstract class JobSubmissionTask extends 
AiravataTask {
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, 
MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);
+            //getStatusPublisher().publish(msgCtx);
         } catch (Exception e) {
-            throw new Exception("Error persisting job status" + 
e.getLocalizedMessage(), e);
+            throw new Exception("Error persisting job status " + 
e.getLocalizedMessage(), e);
         }
     }
 
diff --git 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
index 63921db..abd36e1 100644
--- 
a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
+++ 
b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java
@@ -27,7 +27,7 @@ public class SimpleWorkflow {
 
     public static void main(String[] args) throws Exception {
 
-        String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6";
+        String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001";
         AppCatalog appCatalog = RegistryFactory.getAppCatalog();
         ExperimentCatalog experimentCatalog = 
RegistryFactory.getDefaultExpCatalog();
 
@@ -51,10 +51,11 @@ public class SimpleWorkflow {
                     airavataTask = new EnvSetupTask();
                 } else if (taskModel.getTaskType() == 
TaskTypes.JOB_SUBMISSION) {
                     airavataTask = new DefaultJobSubmissionTask();
+                    airavataTask.setRetryCount(1);
                     jobSubmissionFound = true;
                 } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
                     if (jobSubmissionFound) {
-                        airavataTask = new OutputDataStagingTask();
+                        //airavataTask = new OutputDataStagingTask();
                     } else {
                         airavataTask = new InputDataStagingTask();
                     }
diff --git a/modules/helix-spectator/src/main/resources/application.properties 
b/modules/helix-spectator/src/main/resources/application.properties
index a9b0969..b4b8048 100644
--- a/modules/helix-spectator/src/main/resources/application.properties
+++ b/modules/helix-spectator/src/main/resources/application.properties
@@ -1,3 +1,3 @@
 zookeeper.connection.url=localhost:2199
 helix.cluster.name=AiravataDemoCluster
-participant.name=all-p2
\ No newline at end of file
+participant.name=all-p3
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
dimuthu...@apache.org.

Reply via email to