This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new 34f2e83  Adding a job verification task to avoid premature data 
transfers
34f2e83 is described below

commit 34f2e831b2e90f3024d188dad91124fbec033886
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Jun 2 22:06:03 2019 -0400

    Adding a job verification task to avoid premature data transfers
---
 .../helix/impl/participant/GlobalParticipant.java  |   1 +
 .../impl/task/staging/JobVerificationTask.java     | 102 +++++++++++++++++++++
 .../helix/impl/workflow/PostWorkflowManager.java   |  11 +++
 3 files changed, 114 insertions(+)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index c704a9c..f734f6b 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -36,6 +36,7 @@ public class GlobalParticipant extends 
HelixParticipant<AbstractTask> {
             "org.apache.airavata.helix.impl.task.env.EnvSetupTask",
             "org.apache.airavata.helix.impl.task.staging.InputDataStagingTask",
             
"org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask",
+            "org.apache.airavata.helix.impl.task.staging.JobVerificationTask",
             "org.apache.airavata.helix.impl.task.completing.CompletingTask",
             
"org.apache.airavata.helix.impl.task.submission.ForkJobSubmissionTask",
             
"org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
new file mode 100644
index 0000000..b878392
--- /dev/null
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
@@ -0,0 +1,102 @@
+package org.apache.airavata.helix.impl.task.staging;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import 
org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@TaskDef(name = "Job Verification Task")
+public class JobVerificationTask extends AiravataTask {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(JobVerificationTask.class);
+
+    @Override
+    public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+
+        try {
+            List<JobModel> jobs = 
getRegistryServiceClient().getJobs("processId", getProcessId());
+
+            logger.info("Fetching jobs for process " + getProcessId() + " to 
verify to saturation");
+
+            if (jobs == null || jobs.size() == 0) {
+                return onSuccess("Can not find running jobs for process " + 
getProcessId());
+            }
+
+            logger.info("Found " + jobs.size() + " jobs for process");
+
+            logger.info("Fetching job manager configuration for process " + 
getProcessId());
+
+            JobManagerConfiguration jobManagerConfiguration = 
JobFactory.getJobManagerConfiguration(
+                    JobFactory.getResourceJobManager(
+                            getRegistryServiceClient(),
+                            getTaskContext().getJobSubmissionProtocol(),
+                            
getTaskContext().getPreferredJobSubmissionInterface()));
+
+            AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                    getTaskContext().getGatewayId(),
+                    getTaskContext().getComputeResourceId(),
+                    getTaskContext().getJobSubmissionProtocol(),
+                    getTaskContext().getComputeResourceCredentialToken(),
+                    getTaskContext().getComputeResourceLoginUserName());
+
+            for (JobModel job : jobs) {
+
+                try {
+                    RawCommandInfo monitorCommand = 
jobManagerConfiguration.getMonitorCommand(job.getJobId());
+                    int retryDelaySeconds = 30;
+                    int nextWaitingTime;
+                    for (int i = 1; i <= 4; i++) {
+                        CommandOutput jobMonitorOutput = 
adaptor.executeCommand(monitorCommand.getRawCommand(), null);
+
+                        if (jobMonitorOutput.getExitCode() == 0) {
+                            JobStatus jobStatus = 
jobManagerConfiguration.getParser().parseJobStatus(job.getJobId(), 
jobMonitorOutput.getStdOut());
+                            logger.info("Status of job id " + job.getJobId() + 
" " + jobStatus.getJobState());
+                            if (jobStatus.getJobState() == JobState.ACTIVE ||
+                                    jobStatus.getJobState() == JobState.QUEUED 
||
+                                    jobStatus.getJobState() == 
JobState.SUBMITTED) {
+                                nextWaitingTime = retryDelaySeconds * i;
+                                logger.info("Waiting " + nextWaitingTime + " 
seconds until the job becomes saturated");
+                                Thread.sleep(nextWaitingTime);
+                            } else {
+                                logger.info("Job is in saturated state");
+                                break;
+                            }
+
+                        } else {
+                            logger.warn("Error while fetching the job " + 
job.getJobId() + " status. Std out " + jobMonitorOutput.getStdOut() +
+                                    ". Std err " + 
jobMonitorOutput.getStdError() + ". Job monitor command " + 
monitorCommand.getRawCommand());
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("Unknown error while fetching the job status 
but continuing..", e);
+                }
+            }
+
+            logger.info("Successfully completed job verification task");
+            return onSuccess("Successfully completed job verification task");
+
+        } catch (Exception e) {
+            logger.error("Unknown error while verifying jobs of process " + 
getProcessId() + " but continuing as this is non critical", e);
+            return onSuccess("Unknown error while verifying jobs of process " 
+ getProcessId() + " but continuing as this is non critical");
+        }
+    }
+
+    @Override
+    public void onCancel(TaskContext taskContext) {
+
+    }
+}
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 5bb9595..90a0862 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -27,6 +27,7 @@ import org.apache.airavata.helix.core.OutPort;
 import org.apache.airavata.helix.impl.task.*;
 import org.apache.airavata.helix.impl.task.completing.CompletingTask;
 import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
+import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
 import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.ProcessState;
@@ -213,6 +214,16 @@ public class PostWorkflowManager extends WorkflowManager {
         String[] taskIds = taskDag.split(",");
         final List<AiravataTask> allTasks = new ArrayList<>();
 
+        JobVerificationTask jobVerificationTask = new JobVerificationTask();
+        jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
+        jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
+        jobVerificationTask.setProcessId(processModel.getProcessId());
+        jobVerificationTask.setTaskId("Job-Verification-Task-" + 
UUID.randomUUID().toString() +"-");
+        jobVerificationTask.setForceRunTask(forceRun);
+        jobVerificationTask.setSkipTaskStatusPublish(true);
+
+        allTasks.add(jobVerificationTask);
+
         boolean jobSubmissionFound = false;
 
         for (String taskId : taskIds) {

Reply via email to