Repository: airavata Updated Branches: refs/heads/airavata-0.15-release-branch a320a9474 -> 59e36d899
Added thread pool to process emails Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/59e36d89 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/59e36d89 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/59e36d89 Branch: refs/heads/airavata-0.15-release-branch Commit: 59e36d89913fae0477a27c4fcfb070e127f3f897 Parents: a320a94 Author: Shameera Rathanyaka <[email protected]> Authored: Tue Jun 9 16:55:32 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Tue Jun 9 16:55:32 2015 -0400 ---------------------------------------------------------------------- .../gfac/monitor/email/EmailBasedMonitor.java | 84 ++++++++++++++------ 1 file changed, 58 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/59e36d89/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index a9363cb..f9b7eb5 100644 --- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -59,12 +59,19 @@ import javax.mail.search.FlagTerm; import javax.mail.search.SearchTerm; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class EmailBasedMonitor implements Runnable{ private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class); @@ -82,6 +89,7 @@ public class EmailBasedMonitor implements Runnable{ private String host, emailAddress, password, storeProtocol, folderName ; private Date monitorStartDate; private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>(); + private ExecutorService executor; public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException { init(); @@ -99,6 +107,8 @@ public class EmailBasedMonitor implements Runnable{ } properties = new Properties(); properties.put("mail.store.protocol", storeProtocol); + executor = Executors.newFixedThreadPool(30); + } public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) { @@ -215,37 +225,59 @@ public class EmailBasedMonitor implements Runnable{ } private void processMessages(Message[] searchMessages) throws MessagingException { - List<Message> processedMessages = new ArrayList<>(); - List<Message> unreadMessages = new ArrayList<>(); + List<Message> processedMessages = Collections.synchronizedList(new ArrayList<>()); + List<Message> unreadMessages = Collections.synchronizedList(new ArrayList<>()); + List<Future<JobStatusResult>> futureList = new ArrayList<>(); + + // use thread pool to increase email processing for (Message message : searchMessages) { - try { - JobStatusResult jobStatusResult = parse(message); - JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId()); - if (jEC == null) { - jEC = jobMonitorMap.get(jobStatusResult.getJobName()); - } - if (jEC != null) { - process(jobStatusResult, jEC); - processedMessages.add(message); - } else { - // we can get JobExecutionContext null in multiple Gfac instances environment, - // where this job is not submitted by this Gfac instance hence we ignore this message. - unreadMessages.add(message); + Future<JobStatusResult> jobStatusFuture = executor.submit(new Callable<JobStatusResult>() { + JobStatusResult jobStatusResult = null; + @Override + public JobStatusResult call() throws Exception { + try { + jobStatusResult = parse(message); + JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId()); + if (jEC == null) { + jEC = jobMonitorMap.get(jobStatusResult.getJobName()); + } + if (jEC != null) { + process(jobStatusResult, jEC); + processedMessages.add(message); + } else { + // we can get JobExecutionContext null in multiple Gfac instances environment, + // where this job is not submitted by this Gfac instance hence we ignore this message. + unreadMessages.add(message); // log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId()); + } + } catch (AiravataException e) { + log.error("[EJM]: Error parsing email message =====================================>", e); + try { + writeEnvelopeOnError(message); + } catch (MessagingException e1) { + log.error("[EJM]: Error printing envelop of the email"); + } + unreadMessages.add(message); + } catch (MessagingException e) { + log.error("[EJM]: Error while retrieving sender address from message : " + message.toString()); + unreadMessages.add(message); + } + return jobStatusResult; } - } catch (AiravataException e) { - log.error("[EJM]: Error parsing email message =====================================>", e); - try { - writeEnvelopeOnError(message); - } catch (MessagingException e1) { - log.error("[EJM]: Error printing envelop of the email"); - } - unreadMessages.add(message); - } catch (MessagingException e) { - log.error("[EJM]: Error while retrieving sender address from message : " + message.toString()); - unreadMessages.add(message); + }); + futureList.add(jobStatusFuture); + } + // wait until all thread returns + for (Future<JobStatusResult> jobStatusResultFuture : futureList) { + try { + jobStatusResultFuture.get(); + } catch (InterruptedException e) { + log.error("Error while calling future.get() ", e); + } catch (ExecutionException e) { + log.error("Error while calling future.get()", e); } } + if (!processedMessages.isEmpty()) { Message[] seenMessages = new Message[processedMessages.size()]; processedMessages.toArray(seenMessages);
