Repository: airavata Updated Branches: refs/heads/master fdb3fc918 -> fc2088202
Fixed AIRAVATA-1665 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/50672a79 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/50672a79 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/50672a79 Branch: refs/heads/master Commit: 50672a799c7d9c47f23e075f055f090a15c5a013 Parents: bf0fddb Author: shamrath <[email protected]> Authored: Thu Apr 9 16:14:17 2015 -0400 Committer: shamrath <[email protected]> Committed: Thu Apr 9 16:14:17 2015 -0400 ---------------------------------------------------------------------- .../gfac/monitor/email/EmailBasedMonitor.java | 27 +++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/50672a79/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 6e12d83..b6bfa6c 100644 --- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -46,6 +46,8 @@ import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.Store; import javax.mail.search.FlagTerm; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -142,10 +144,19 @@ public class EmailBasedMonitor implements Runnable{ emailFolder = store.getFolder(folderName); emailFolder.open(Folder.READ_WRITE); Message[] searchMessages = emailFolder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false)); + List<Message> processedMessages = new ArrayList<>(); for (Message message : searchMessages) { try { JobStatusResult jobStatusResult = parse(message); - process(jobStatusResult); + JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId()); + if (jEC != null) { + process(jobStatusResult, jEC); + processedMessages.add(message); + } else { + // we can get JobExecutionContext null in multiple Gfac instances environment, + // where this job is not submitted by this Gfac instance hence we ignore this message. + log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId()); + } } catch (AiravataException e) { log.error("Error parsing email message =====================================>", e); try { @@ -153,13 +164,15 @@ public class EmailBasedMonitor implements Runnable{ } catch (MessagingException e1) { log.error("Error printing envelop of the email"); } - } } - emailFolder.setFlags(searchMessages, new Flags(Flags.Flag.SEEN), true); + if (!processedMessages.isEmpty()) { + Message[] prosMessages = new Message[processedMessages.size()]; + processedMessages.toArray(prosMessages); + emailFolder.setFlags(prosMessages, new Flags(Flags.Flag.SEEN), true); + } emailFolder.close(false); } - } catch (MessagingException e) { log.error("Couldn't connect to the store ", e); } catch (InterruptedException e) { @@ -173,11 +186,7 @@ public class EmailBasedMonitor implements Runnable{ } } - private void process(JobStatusResult jobStatusResult) throws AiravataException { - JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId()); - if (jEC == null) { - throw new AiravataException("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId()); - } + private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){ JobState resultState = jobStatusResult.getState(); jEC.getJobDetails().setJobStatus(new JobStatus(resultState)); if (resultState == JobState.COMPLETE) {
