handling race conditions with multip
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af39d44b Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af39d44b Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af39d44b Branch: refs/heads/ansible-testing-0.17 Commit: af39d44b2d0bef340399e45f8c7e27dd7b1a6bc1 Parents: 3c453ae Author: scnakandala <[email protected]> Authored: Sat Mar 4 20:58:41 2017 -0500 Committer: scnakandala <[email protected]> Committed: Sat Mar 4 20:58:41 2017 -0500 ---------------------------------------------------------------------- .../server/src/main/resources/gfac-config.yaml | 5 + .../gfac/core/monitor/JobStatusResult.java | 9 ++ .../gfac/monitor/email/EmailBasedMonitor.java | 103 +++++++++++++------ .../email/parser/AiravataCustomMailParser.java | 8 ++ 4 files changed, 96 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/configuration/server/src/main/resources/gfac-config.yaml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml index cea54a6..edb7922 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.yaml +++ b/modules/configuration/server/src/main/resources/gfac-config.yaml @@ -113,3 +113,8 @@ resources: - jobManagerType: FORK commandOutputParser: org.apache.airavata.gfac.impl.job.ForkOutputParser + + - jobManagerType: AIRAVATA + emailParser: org.apache.airavata.gfac.monitor.email.parser.AiravataCustomMailParser + resourceEmailAddresses: + - [email protected] http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java index 3e43b49..9c80c9d 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java @@ -25,6 +25,7 @@ import org.apache.airavata.model.status.JobState; public class JobStatusResult { private JobState state; private String jobId; + private boolean authoritative = true; public String getJobName() { return jobName; @@ -51,5 +52,13 @@ public class JobStatusResult { public void setJobId(String jobId) { this.jobId = jobId; } + + public boolean isAuthoritative() { + return authoritative; + } + + public void setAuthoritative(boolean authoritative) { + this.authoritative = authoritative; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index bbcd635..02dfa00 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -337,42 +337,87 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ ProcessContext parentProcessContext = taskContext.getParentProcessContext(); JobModel jobModel = parentProcessContext.getJobModel(); String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId(); + + JobState currentState = null; + List<JobStatus> jobStatusList = jobModel.getJobStatuses(); + if (jobStatusList != null && jobStatusList.size() > 0) { + JobStatus lastStatus = jobStatusList.get(0); + for (JobStatus temp : jobStatusList) { + if (temp.getTimeOfStateChange() >= lastStatus.getTimeOfStateChange()) { + lastStatus = temp; + } + } + currentState = lastStatus.getJobState(); + } + // TODO - Handle all other valid JobStates + // FIXME - What if non-authoritative email comes later (getting accumulated in the email account) if (resultState == JobState.COMPLETE) { - jobMonitorMap.remove(jobStatusResult.getJobId()); - jobStatus.setJobState(JobState.COMPLETE); - jobStatus.setReason("Complete email received"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - runOutflowTasks = true; - log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails); + if (jobStatusResult.isAuthoritative()) { + if (currentState != null && currentState == JobState.COMPLETE) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutflowTasks = false; + log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails); + } else { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutflowTasks = true; + jobStatus.setJobState(JobState.COMPLETE); + jobStatus.setReason("Complete email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails); + } + } else { + runOutflowTasks = true; + jobStatus.setJobState(JobState.COMPLETE); + jobStatus.setReason("Complete email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Non Authoritative Job Complete email received. " + jobDetails); + } }else if (resultState == JobState.QUEUED) { - // nothing special thing to do, update the status change to rabbit mq at the end of this method. - jobStatus.setJobState(JobState.QUEUED); - jobStatus.setReason("Queue email received"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - log.info("[EJM]: Job Queued email received, " + jobDetails); + //It is possible that we will get an early complete message from custom Airavata emails instead from the + //scheduler + if (currentState != JobState.COMPLETE) { + // nothing special thing to do, update the status change to rabbit mq at the end of this method. + jobStatus.setJobState(JobState.QUEUED); + jobStatus.setReason("Queue email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Job Queued email received, " + jobDetails); + } }else if (resultState == JobState.ACTIVE) { - // nothing special thing to do, update the status change to rabbit mq at the end of this method. - jobStatus.setJobState(JobState.ACTIVE); - jobStatus.setReason("Active email received"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - log.info("[EJM]: Job Active email received, " + jobDetails); + //It is possible that we will get an early complete message from custom Airavata emails instead from the + //scheduler + if (currentState != JobState.COMPLETE) { + // nothing special thing to do, update the status change to rabbit mq at the end of this method. + jobStatus.setJobState(JobState.ACTIVE); + jobStatus.setReason("Active email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Job Active email received, " + jobDetails); + } }else if (resultState == JobState.FAILED) { - jobMonitorMap.remove(jobStatusResult.getJobId()); - runOutflowTasks = true; - jobStatus.setJobState(JobState.FAILED); - jobStatus.setReason("Failed email received"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails); + //It is possible that we will get an early complete message from custom Airavata emails instead from the + //scheduler + if (currentState != JobState.COMPLETE) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + runOutflowTasks = true; + jobStatus.setJobState(JobState.FAILED); + jobStatus.setReason("Failed email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails); + } }else if (resultState == JobState.CANCELED) { - jobMonitorMap.remove(jobStatusResult.getJobId()); - jobStatus.setJobState(JobState.CANCELED); - jobStatus.setReason("Canceled email received"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails); - runOutflowTasks = true; // we run out flow and this will move process to cancel state. + //It is possible that we will get an early complete message from custom Airavata emails instead from the + //scheduler + if (currentState != JobState.COMPLETE) { + jobMonitorMap.remove(jobStatusResult.getJobId()); + jobStatus.setJobState(JobState.CANCELED); + jobStatus.setReason("Canceled email received"); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails); + runOutflowTasks = true; // we run out flow and this will move process to cancel state. + } } - if (jobStatus.getJobState() != null) { + + if (jobStatus.getJobState() != null) { try { jobModel.setJobStatuses(Arrays.asList(jobStatus)); log.info("[EJM]: Publishing status changes to amqp. " + jobDetails); http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java index 8810814..9c773b1 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java @@ -55,6 +55,14 @@ public class AiravataCustomMailParser implements EmailParser { jobStatusResult.setJobId(matcher.group(JOBID)); jobStatusResult.setJobName(matcher.group(JOBNAME)); jobStatusResult.setState(getJobState(matcher.group(STATUS))); + jobStatusResult.setAuthoritative(false); + + try { + //Waiting some time for the scheduler to move the job from completing to completed. + Thread.sleep(5000); + } catch (Exception ex) { + } + } else { log.error("[EJM]: No matched found for subject -> " + subject); }
