This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8f7dc3d  Adding consistent job submission, fixing zookeeper connection 
issues and logging improvements
8f7dc3d is described below

commit 8f7dc3dc8889bd21cb00911d323a66721a960c81
Author: dimuthu <[email protected]>
AuthorDate: Tue Apr 3 15:53:24 2018 -0400

    Adding consistent job submission, fixing zookeeper connection issues and 
logging improvements
---
 .../task/cancel/RemoteJobCancellationTask.java     | 20 +------
 .../impl/task/cancel/WorkflowCancellationTask.java | 21 +++++--
 .../task/submission/DefaultJobSubmissionTask.java  | 10 +---
 .../impl/task/submission/JobSubmissionTask.java    | 69 +++++++++++++++-------
 .../helix/impl/workflow/PostWorkflowManager.java   | 12 ++--
 .../apache/airavata/helix/core/AbstractTask.java   | 23 ++++++++
 .../airavata/helix/workflow/WorkflowManager.java   |  2 +-
 7 files changed, 98 insertions(+), 59 deletions(-)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index 20813b0..2302233 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -2,8 +2,6 @@ package org.apache.airavata.helix.impl.task.cancel;
 
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.CommandOutput;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
@@ -11,10 +9,6 @@ import 
org.apache.airavata.helix.impl.task.submission.config.JobManagerConfigura
 import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.Logger;
@@ -27,19 +21,9 @@ public class RemoteJobCancellationTask extends AiravataTask {
 
     private final static Logger logger = 
LoggerFactory.getLogger(RemoteJobCancellationTask.class);
 
-    private CuratorFramework curatorClient = null;
-
     @Override
     public void init(HelixManager manager, String workflowName, String 
jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        try {
-            this.curatorClient = 
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), 
retryPolicy);
-            this.curatorClient.start();
-        } catch (ApplicationSettingsException e) {
-            logger.error("Failed to create curator client ", e);
-            throw new RuntimeException(e);
-        }
     }
 
     @Override
@@ -108,8 +92,8 @@ public class RemoteJobCancellationTask extends AiravataTask {
 
     private List<String> getJobsOfProcess(String processId) throws Exception {
         String path = "/registry/" + processId + "/jobs";
-        if (this.curatorClient.checkExists().forPath(path) != null) {
-            return this.curatorClient.getChildren().forPath(path);
+        if (getCuratorClient().checkExists().forPath(path) != null) {
+            return getCuratorClient().getChildren().forPath(path);
         } else {
             return null;
         }
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 0513327..95b256c 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -20,6 +20,7 @@ public class WorkflowCancellationTask extends AbstractTask {
     private final static Logger logger = 
LoggerFactory.getLogger(WorkflowCancellationTask.class);
 
     private TaskDriver taskDriver;
+    private HelixManager helixManager;
 
     @TaskParam(name = "Cancelling Workflow")
     private String cancellingWorkflowName;
@@ -33,16 +34,15 @@ public class WorkflowCancellationTask extends AbstractTask {
 
         try {
 
-            HelixManager helixManager = 
HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"),
 taskName,
+            helixManager = 
HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"),
 taskName,
                     InstanceType.SPECTATOR, 
ServerSettings.getZookeeperConnection());
             helixManager.connect();
             Runtime.getRuntime().addShutdownHook(
-                    new Thread() {
-                        @Override
-                        public void run() {
+                    new Thread(() -> {
+                        if (helixManager.isConnected()) {
                             helixManager.disconnect();
                         }
-                    }
+                    })
             );
             taskDriver = new TaskDriver(helixManager);
         } catch (Exception e) {
@@ -74,6 +74,17 @@ public class WorkflowCancellationTask extends AbstractTask {
         } catch (Exception e) {
             logger.error("Failed to stop workflow " + cancellingWorkflowName, 
e);
             return onFail("Failed to stop workflow " + cancellingWorkflowName 
+ ": " + e.getMessage(), true);
+        } finally {
+
+            try {
+                if (helixManager != null) {
+                    if (helixManager.isConnected()) {
+                        helixManager.disconnect();
+                    }
+                }
+            } catch (Exception e) {
+                logger.warn("Failed to disconnect helix manager", e);
+            }
         }
     }
 
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index a9e2915..a85abfa 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -89,8 +89,7 @@ public class DefaultJobSubmissionTask extends 
JobSubmissionTask {
                     
statusList.get(0).setReason(submissionOutput.getFailureReason());
                     jobModel.setJobStatuses(statusList);
                     saveJobModel(jobModel);
-                    logger.error("expId: " + getExperimentId() + ", processid: 
" + getProcessId()+ ", taskId: " +
-                            getTaskId() + " :- Job submission failed for job 
name " + jobModel.getJobName()
+                    logger.error("Job submission failed for job name " + 
jobModel.getJobName()
                             + ". Exit code : " + 
submissionOutput.getExitCode() + ", Submission failed : "
                             + submissionOutput.isJobSubmissionFailed());
 
@@ -100,19 +99,16 @@ public class DefaultJobSubmissionTask extends 
JobSubmissionTask {
                             false, null);
 
                 } else {
-
                     String msg;
                     saveJobModel(jobModel);
                     ErrorModel errorModel = new ErrorModel();
                     if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
-                        msg = "expId:" + getExperimentId() + ", processId:" + 
getProcessId() + ", taskId: " + getTaskId() +
-                                " return non zero exit code:" + 
submissionOutput.getExitCode() + "  for JobName:" + jobModel.getJobName() +
+                        msg = "Returned non zero exit code:" + 
submissionOutput.getExitCode() + "  for JobName:" + jobModel.getJobName() +
                                 ", with failure reason : " + 
submissionOutput.getFailureReason()
                                 + " Hence changing job state to Failed." ;
                         
errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
                     } else {
-                        msg = "expId:" + getExperimentId() + ", processId:" + 
getProcessId() + ", taskId: " + getTaskId() +
-                                " doesn't  return valid job submission exit 
code for JobName:" + jobModel.getJobName() +
+                        msg = "Didn't return valid job submission exit code 
for JobName:" + jobModel.getJobName() +
                                 ", with failure reason : stdout ->" + 
submissionOutput.getStdOut() +
                                 " stderr -> " + submissionOutput.getStdErr() + 
" Hence changing job state to Failed." ;
                         errorModel.setActualErrorMessage(msg);
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 16869bb..d63c459 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -20,6 +20,7 @@
 package org.apache.airavata.helix.impl.task.submission;
 
 import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.JobSubmissionOutput;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -38,10 +39,6 @@ import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.commons.io.FileUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.helix.HelixManager;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
@@ -55,24 +52,9 @@ public abstract class JobSubmissionTask extends AiravataTask 
{
 
     private final static Logger logger = 
LoggerFactory.getLogger(JobSubmissionTask.class);
 
-    private CuratorFramework curatorClient = null;
-
     @Override
     public void init(HelixManager manager, String workflowName, String 
jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        try {
-            this.curatorClient = 
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), 
retryPolicy);
-            this.curatorClient.start();
-        } catch (ApplicationSettingsException e) {
-            logger.error("Failed to create curator client ", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public CuratorFramework getCuratorClient() {
-        return curatorClient;
     }
 
     // TODO perform exception handling
@@ -106,6 +88,7 @@ public abstract class JobSubmissionTask extends AiravataTask 
{
         addMonitoringCommands(groovyMapData);
 
         String scriptAsString = 
groovyMapData.getAsString(jobManagerConfiguration.getJobDescriptionTemplateName());
+        logger.info("Generated job submission script : " + scriptAsString);
 
         int number = new SecureRandom().nextInt();
         number = (number < 0 ? -number : number);
@@ -117,13 +100,15 @@ public abstract class JobSubmissionTask extends 
AiravataTask {
         logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " 
to remote path " + workingDirectory +
                 " of compute resource " + 
getTaskContext().getComputeResourceId());
         agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(), 
workingDirectory);
-        // TODO transfer file
+
         RawCommandInfo submitCommand = 
jobManagerConfiguration.getSubmitCommand(workingDirectory, 
tempJobFile.getPath());
 
-        logger.debug("Submit command for process id " + getProcessId() + " : " 
+ submitCommand.getRawCommand());
+        logger.info("Submit command for process id " + getProcessId() + " : " 
+ submitCommand.getRawCommand());
         logger.debug("Working directory for process id " + getProcessId() + " 
: " + workingDirectory);
 
-        CommandOutput commandOutput = 
agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
+        CommandOutput commandOutput = 
submitCommandWithRecording(submitCommand, agentAdaptor, groovyMapData, 
workingDirectory);
+        logger.info("Job " + groovyMapData.getJobName() + " submitted to 
compute resource");
+        logger.info("Submission stdout: " + commandOutput.getStdOut() + ", 
stderr: " + commandOutput.getStdError());
 
         JobSubmissionOutput jsoutput = new JobSubmissionOutput();
         jsoutput.setDescription(scriptAsString);
@@ -147,6 +132,46 @@ public abstract class JobSubmissionTask extends 
AiravataTask {
         return jsoutput;
     }
 
+    /**
+     * This will write the standard output of the command to a file inside the 
working directory of the process and
+     * if the agent does not receive the output through first invocation, it 
retries by looking into the output file.
+     *
+     * @param submitCommand command to submit
+     * @param agentAdaptor agent adaptor to communicate with compute resource
+     * @param groovyMapData metadata object of the job
+     * @param workingDirectory working directory for the process
+     * @return {@link CommandOutput} of the submitted command
+     * @throws AgentException if agent failed to communicate with the compute 
host
+     */
+    private CommandOutput submitCommandWithRecording(RawCommandInfo 
submitCommand, AgentAdaptor agentAdaptor,
+                                                     GroovyMapData 
groovyMapData, String workingDirectory) throws AgentException {
+
+        String modifiedCommand =  submitCommand.getCommand() + " | tee " + 
getJobCommandRecordingFile(groovyMapData);
+        logger.info("Modified the submit command to support recording : " + 
modifiedCommand);
+
+        CommandOutput commandOutput = 
agentAdaptor.executeCommand(modifiedCommand, workingDirectory);
+
+        if (commandOutput.getStdOut() == null || 
"".equals(commandOutput.getStdOut())) {
+            logger.warn("command submission returned empty response so reading 
recording file at " + getJobCommandRecordingFile(groovyMapData));
+            CommandOutput recordingFileReadCommandOutput = 
agentAdaptor.executeCommand("cat " + getJobCommandRecordingFile(groovyMapData),
+                    groovyMapData.getWorkingDirectory());
+            if (recordingFileReadCommandOutput.getStdOut() != null && 
!"".equals(recordingFileReadCommandOutput.getStdOut())) {
+                logger.info("Received non empty output form recording file : " 
+ recordingFileReadCommandOutput.getStdOut());
+                return recordingFileReadCommandOutput;
+            } else {
+                return commandOutput;
+            }
+        } else {
+            return commandOutput;
+        }
+    }
+
+    private String getJobCommandRecordingFile(GroovyMapData mapData) {
+        return (mapData.getWorkingDirectory().endsWith(File.separator) ?
+                mapData.getWorkingDirectory() : mapData.getWorkingDirectory() 
+ File.separator) +
+                mapData.getJobName();
+    }
+
     @SuppressWarnings("WeakerAccess")
     public File getLocalDataDir() {
         String outputPath = ServerSettings.getLocalDataLocation();
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index e68b526..5fb2221 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -72,11 +72,16 @@ public class PostWorkflowManager {
 
     private CuratorFramework curatorClient = null;
     private Publisher statusPublisher;
+    private WorkflowManager workflowManager;
 
-    private void init() throws ApplicationSettingsException {
+    private void init() throws Exception {
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         this.curatorClient = 
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), 
retryPolicy);
         this.curatorClient.start();
+        workflowManager = new WorkflowManager(
+                ServerSettings.getSetting("helix.cluster.name"),
+                ServerSettings.getSetting("post.workflow.manager.name"),
+                ServerSettings.getZookeeperConnection());
     }
 
     private Consumer<String, JobStatusResult> createConsumer() throws 
ApplicationSettingsException {
@@ -276,11 +281,6 @@ public class PostWorkflowManager {
                         }
                         allTasks.add(completingTask);
 
-                        WorkflowManager workflowManager = new WorkflowManager(
-                                
ServerSettings.getSetting("helix.cluster.name"),
-                                
ServerSettings.getSetting("post.workflow.manager.name"),
-                                ServerSettings.getZookeeperConnection());
-
                         String workflowName = 
workflowManager.launchWorkflow(processId + "-POST-" + 
UUID.randomUUID().toString(),
                                 new ArrayList<>(allTasks), true, false);
                         try {
diff --git 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index b7304d7..fa9a205 100644
--- 
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++ 
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -19,10 +19,16 @@
  */
 package org.apache.airavata.helix.core;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.core.util.TaskUtil;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
 import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
@@ -44,6 +50,8 @@ public abstract class AbstractTask extends UserContentStore 
implements Task {
     private static final String NEXT_JOB = "next-job";
     private static final String WORKFLOW_STARTED = "workflow-started";
 
+    private static CuratorFramework curatorClient = null;
+
     @TaskParam(name = "taskId")
     private String taskId;
 
@@ -160,4 +168,19 @@ public abstract class AbstractTask extends 
UserContentStore implements Task {
     public void setNextTask(OutPort nextTask) {
         this.nextTask = nextTask;
     }
+
+    protected synchronized CuratorFramework getCuratorClient() {
+
+        if (curatorClient == null) {
+            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+            try {
+                this.curatorClient = 
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), 
retryPolicy);
+                this.curatorClient.start();
+            } catch (ApplicationSettingsException e) {
+                logger.error("Failed to create curator client ", e);
+                throw new RuntimeException(e);
+            }
+        }
+        return curatorClient;
+    }
 }
diff --git 
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
 
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index df558f2..3197a47 100644
--- 
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++ 
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -65,7 +65,7 @@ public class WorkflowManager {
         taskDriver = new TaskDriver(helixManager);
     }
 
-    public String launchWorkflow(String processId, List<AbstractTask> tasks, 
boolean globalParticipant, boolean monitor) throws Exception {
+    public synchronized String launchWorkflow(String processId, 
List<AbstractTask> tasks, boolean globalParticipant, boolean monitor) throws 
Exception {
 
         String workflowName = WORKFLOW_PREFIX + processId;
         logger.info("Launching workflow " + workflowName + " for process " + 
processId);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to