This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new 34f2e83 Adding a job verification task to avoid premature data
transfers
34f2e83 is described below
commit 34f2e831b2e90f3024d188dad91124fbec033886
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Jun 2 22:06:03 2019 -0400
Adding a job verification task to avoid premature data transfers
---
.../helix/impl/participant/GlobalParticipant.java | 1 +
.../impl/task/staging/JobVerificationTask.java | 102 +++++++++++++++++++++
.../helix/impl/workflow/PostWorkflowManager.java | 11 +++
3 files changed, 114 insertions(+)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index c704a9c..f734f6b 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -36,6 +36,7 @@ public class GlobalParticipant extends
HelixParticipant<AbstractTask> {
"org.apache.airavata.helix.impl.task.env.EnvSetupTask",
"org.apache.airavata.helix.impl.task.staging.InputDataStagingTask",
"org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask",
+ "org.apache.airavata.helix.impl.task.staging.JobVerificationTask",
"org.apache.airavata.helix.impl.task.completing.CompletingTask",
"org.apache.airavata.helix.impl.task.submission.ForkJobSubmissionTask",
"org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask",
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
new file mode 100644
index 0000000..b878392
--- /dev/null
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
@@ -0,0 +1,102 @@
+package org.apache.airavata.helix.impl.task.staging;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.helix.impl.task.AiravataTask;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
+import
org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
+import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@TaskDef(name = "Job Verification Task")
+public class JobVerificationTask extends AiravataTask {
+
+ private final static Logger logger =
LoggerFactory.getLogger(JobVerificationTask.class);
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+
+ try {
+ List<JobModel> jobs =
getRegistryServiceClient().getJobs("processId", getProcessId());
+
+ logger.info("Fetching jobs for process " + getProcessId() + " to
verify to saturation");
+
+ if (jobs == null || jobs.size() == 0) {
+ return onSuccess("Can not find running jobs for process " +
getProcessId());
+ }
+
+ logger.info("Found " + jobs.size() + " jobs for process");
+
+ logger.info("Fetching job manager configuration for process " +
getProcessId());
+
+ JobManagerConfiguration jobManagerConfiguration =
JobFactory.getJobManagerConfiguration(
+ JobFactory.getResourceJobManager(
+ getRegistryServiceClient(),
+ getTaskContext().getJobSubmissionProtocol(),
+
getTaskContext().getPreferredJobSubmissionInterface()));
+
+ AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
+ getTaskContext().getComputeResourceId(),
+ getTaskContext().getJobSubmissionProtocol(),
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
+
+ for (JobModel job : jobs) {
+
+ try {
+ RawCommandInfo monitorCommand =
jobManagerConfiguration.getMonitorCommand(job.getJobId());
+ int retryDelaySeconds = 30;
+ int nextWaitingTime;
+ for (int i = 1; i <= 4; i++) {
+ CommandOutput jobMonitorOutput =
adaptor.executeCommand(monitorCommand.getRawCommand(), null);
+
+ if (jobMonitorOutput.getExitCode() == 0) {
+ JobStatus jobStatus =
jobManagerConfiguration.getParser().parseJobStatus(job.getJobId(),
jobMonitorOutput.getStdOut());
+ logger.info("Status of job id " + job.getJobId() +
" " + jobStatus.getJobState());
+ if (jobStatus.getJobState() == JobState.ACTIVE ||
+ jobStatus.getJobState() == JobState.QUEUED
||
+ jobStatus.getJobState() ==
JobState.SUBMITTED) {
+ nextWaitingTime = retryDelaySeconds * i;
+ logger.info("Waiting " + nextWaitingTime + "
seconds until the job becomes saturated");
+ Thread.sleep(nextWaitingTime);
+ } else {
+ logger.info("Job is in saturated state");
+ break;
+ }
+
+ } else {
+ logger.warn("Error while fetching the job " +
job.getJobId() + " status. Std out " + jobMonitorOutput.getStdOut() +
+ ". Std err " +
jobMonitorOutput.getStdError() + ". Job monitor command " +
monitorCommand.getRawCommand());
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Unknown error while fetching the job status
but continuing..", e);
+ }
+ }
+
+ logger.info("Successfully completed job verification task");
+ return onSuccess("Successfully completed job verification task");
+
+ } catch (Exception e) {
+ logger.error("Unknown error while verifying jobs of process " +
getProcessId() + " but continuing as this is non critical", e);
+ return onSuccess("Unknown error while verifying jobs of process "
+ getProcessId() + " but continuing as this is non critical");
+ }
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 5bb9595..90a0862 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -27,6 +27,7 @@ import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.*;
import org.apache.airavata.helix.impl.task.completing.CompletingTask;
import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
+import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.ProcessState;
@@ -213,6 +214,16 @@ public class PostWorkflowManager extends WorkflowManager {
String[] taskIds = taskDag.split(",");
final List<AiravataTask> allTasks = new ArrayList<>();
+ JobVerificationTask jobVerificationTask = new JobVerificationTask();
+ jobVerificationTask.setGatewayId(experimentModel.getGatewayId());
+ jobVerificationTask.setExperimentId(experimentModel.getExperimentId());
+ jobVerificationTask.setProcessId(processModel.getProcessId());
+ jobVerificationTask.setTaskId("Job-Verification-Task-" +
UUID.randomUUID().toString() +"-");
+ jobVerificationTask.setForceRunTask(forceRun);
+ jobVerificationTask.setSkipTaskStatusPublish(true);
+
+ allTasks.add(jobVerificationTask);
+
boolean jobSubmissionFound = false;
for (String taskId : taskIds) {