handling race conditions with multip

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af39d44b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af39d44b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af39d44b

Branch: refs/heads/ansible-testing-0.17
Commit: af39d44b2d0bef340399e45f8c7e27dd7b1a6bc1
Parents: 3c453ae
Author: scnakandala <[email protected]>
Authored: Sat Mar 4 20:58:41 2017 -0500
Committer: scnakandala <[email protected]>
Committed: Sat Mar 4 20:58:41 2017 -0500

----------------------------------------------------------------------
 .../server/src/main/resources/gfac-config.yaml  |   5 +
 .../gfac/core/monitor/JobStatusResult.java      |   9 ++
 .../gfac/monitor/email/EmailBasedMonitor.java   | 103 +++++++++++++------
 .../email/parser/AiravataCustomMailParser.java  |   8 ++
 4 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/configuration/server/src/main/resources/gfac-config.yaml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml 
b/modules/configuration/server/src/main/resources/gfac-config.yaml
index cea54a6..edb7922 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.yaml
+++ b/modules/configuration/server/src/main/resources/gfac-config.yaml
@@ -113,3 +113,8 @@ resources:
 
   - jobManagerType: FORK
     commandOutputParser: org.apache.airavata.gfac.impl.job.ForkOutputParser
+
+  - jobManagerType: AIRAVATA
+    emailParser: 
org.apache.airavata.gfac.monitor.email.parser.AiravataCustomMailParser
+    resourceEmailAddresses:
+      - [email protected]

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
index 3e43b49..9c80c9d 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobStatusResult.java
@@ -25,6 +25,7 @@ import org.apache.airavata.model.status.JobState;
 public class JobStatusResult {
     private JobState state;
     private String jobId;
+    private boolean authoritative = true;
 
     public String getJobName() {
         return jobName;
@@ -51,5 +52,13 @@ public class JobStatusResult {
     public void setJobId(String jobId) {
         this.jobId = jobId;
     }
+
+    public boolean isAuthoritative() {
+        return authoritative;
+    }
+
+    public void setAuthoritative(boolean authoritative) {
+        this.authoritative = authoritative;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index bbcd635..02dfa00 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -337,42 +337,87 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
         ProcessContext parentProcessContext = 
taskContext.getParentProcessContext();
         JobModel jobModel = parentProcessContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", 
JobId : " + jobStatusResult.getJobId();
+
+        JobState currentState = null;
+        List<JobStatus> jobStatusList = jobModel.getJobStatuses();
+        if (jobStatusList != null && jobStatusList.size() > 0) {
+            JobStatus lastStatus = jobStatusList.get(0);
+            for (JobStatus temp : jobStatusList) {
+                if (temp.getTimeOfStateChange() >= 
lastStatus.getTimeOfStateChange()) {
+                    lastStatus = temp;
+                }
+            }
+            currentState = lastStatus.getJobState();
+        }
+
         // TODO - Handle all other valid JobStates
+        // FIXME - What if non-authoritative email comes later (getting 
accumulated in the email account)
         if (resultState == JobState.COMPLETE) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-               jobStatus.setJobState(JobState.COMPLETE);
-               jobStatus.setReason("Complete email received");
-            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-               runOutflowTasks = true;
-            log.info("[EJM]: Job Complete email received , removed job from 
job monitoring. " + jobDetails);
+            if (jobStatusResult.isAuthoritative()) {
+                if (currentState != null && currentState == JobState.COMPLETE) 
{
+                    jobMonitorMap.remove(jobStatusResult.getJobId());
+                    runOutflowTasks = false;
+                    log.info("[EJM]: Job Complete email received , removed job 
from job monitoring. " + jobDetails);
+                } else {
+                    jobMonitorMap.remove(jobStatusResult.getJobId());
+                    runOutflowTasks = true;
+                    jobStatus.setJobState(JobState.COMPLETE);
+                    jobStatus.setReason("Complete email received");
+                    
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    log.info("[EJM]: Job Complete email received , removed job 
from job monitoring. " + jobDetails);
+                }
+            } else {
+                runOutflowTasks = true;
+                jobStatus.setJobState(JobState.COMPLETE);
+                jobStatus.setReason("Complete email received");
+                
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Non Authoritative Job Complete email 
received. " + jobDetails);
+            }
         }else if (resultState == JobState.QUEUED) {
-               // nothing special thing to do, update the status change to 
rabbit mq at the end of this method.
-               jobStatus.setJobState(JobState.QUEUED);
-               jobStatus.setReason("Queue email received");
-            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-               log.info("[EJM]: Job Queued email received, " + jobDetails);
+            //It is possible that we will get an early complete message from 
custom Airavata emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                // nothing special thing to do, update the status change to 
rabbit mq at the end of this method.
+                jobStatus.setJobState(JobState.QUEUED);
+                jobStatus.setReason("Queue email received");
+                
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job Queued email received, " + jobDetails);
+            }
         }else if (resultState == JobState.ACTIVE) {
-            // nothing special thing to do, update the status change to rabbit 
mq at the end of this method.
-               jobStatus.setJobState(JobState.ACTIVE);
-               jobStatus.setReason("Active email received");
-            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            log.info("[EJM]: Job Active email received, " + jobDetails);
+            //It is possible that we will get an early complete message from 
custom Airavata emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                // nothing special thing to do, update the status change to 
rabbit mq at the end of this method.
+                jobStatus.setJobState(JobState.ACTIVE);
+                jobStatus.setReason("Active email received");
+                
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job Active email received, " + jobDetails);
+            }
         }else if (resultState == JobState.FAILED) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutflowTasks = true;
-               jobStatus.setJobState(JobState.FAILED);
-               jobStatus.setReason("Failed email received");
-            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            log.info("[EJM]: Job failed email received , removed job from job 
monitoring. " + jobDetails);
+            //It is possible that we will get an early complete message from 
custom Airavata emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                jobMonitorMap.remove(jobStatusResult.getJobId());
+                runOutflowTasks = true;
+                jobStatus.setJobState(JobState.FAILED);
+                jobStatus.setReason("Failed email received");
+                
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job failed email received , removed job from 
job monitoring. " + jobDetails);
+            }
         }else if (resultState == JobState.CANCELED) {
-            jobMonitorMap.remove(jobStatusResult.getJobId());
-            jobStatus.setJobState(JobState.CANCELED);
-               jobStatus.setReason("Canceled email received");
-            
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-               log.info("[EJM]: Job canceled mail received, removed job from 
job monitoring. " + jobDetails);
-               runOutflowTasks = true; // we run out flow and this will move 
process to cancel state.
+            //It is possible that we will get an early complete message from 
custom Airavata emails instead from the
+            //scheduler
+            if (currentState != JobState.COMPLETE) {
+                jobMonitorMap.remove(jobStatusResult.getJobId());
+                jobStatus.setJobState(JobState.CANCELED);
+                jobStatus.setReason("Canceled email received");
+                
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                log.info("[EJM]: Job canceled mail received, removed job from 
job monitoring. " + jobDetails);
+                runOutflowTasks = true; // we run out flow and this will move 
process to cancel state.
+            }
         }
-           if (jobStatus.getJobState() != null) {
+
+        if (jobStatus.getJobState() != null) {
                    try {
                            jobModel.setJobStatuses(Arrays.asList(jobStatus));
                            log.info("[EJM]: Publishing status changes to amqp. 
" + jobDetails);

http://git-wip-us.apache.org/repos/asf/airavata/blob/af39d44b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
index 8810814..9c773b1 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/AiravataCustomMailParser.java
@@ -55,6 +55,14 @@ public class AiravataCustomMailParser implements EmailParser 
{
             jobStatusResult.setJobId(matcher.group(JOBID));
             jobStatusResult.setJobName(matcher.group(JOBNAME));
             jobStatusResult.setState(getJobState(matcher.group(STATUS)));
+            jobStatusResult.setAuthoritative(false);
+
+            try {
+                //Waiting some time for the scheduler to move the job from 
completing to completed.
+                Thread.sleep(5000);
+            } catch (Exception ex) {
+            }
+
         } else {
             log.error("[EJM]: No matched found for subject -> " + subject);
         }

Reply via email to