Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch a320a9474 -> 59e36d899


Added thread pool to process emails


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/59e36d89
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/59e36d89
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/59e36d89

Branch: refs/heads/airavata-0.15-release-branch
Commit: 59e36d89913fae0477a27c4fcfb070e127f3f897
Parents: a320a94
Author: Shameera Rathanyaka <[email protected]>
Authored: Tue Jun 9 16:55:32 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Tue Jun 9 16:55:32 2015 -0400

----------------------------------------------------------------------
 .../gfac/monitor/email/EmailBasedMonitor.java   | 84 ++++++++++++++------
 1 file changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/59e36d89/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index a9363cb..f9b7eb5 100644
--- 
a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -59,12 +59,19 @@ import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class EmailBasedMonitor implements Runnable{
     private static final AiravataLogger log = 
AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
@@ -82,6 +89,7 @@ public class EmailBasedMonitor implements Runnable{
     private String host, emailAddress, password, storeProtocol, folderName ;
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new 
HashMap<ResourceJobManagerType, EmailParser>();
+    private ExecutorService executor;
 
     public EmailBasedMonitor(ResourceJobManagerType type) throws 
AiravataException {
         init();
@@ -99,6 +107,8 @@ public class EmailBasedMonitor implements Runnable{
         }
         properties = new Properties();
         properties.put("mail.store.protocol", storeProtocol);
+        executor = Executors.newFixedThreadPool(30);
+
     }
 
     public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
@@ -215,37 +225,59 @@ public class EmailBasedMonitor implements Runnable{
     }
 
     private void processMessages(Message[] searchMessages) throws 
MessagingException {
-        List<Message> processedMessages = new ArrayList<>();
-        List<Message> unreadMessages = new ArrayList<>();
+        List<Message> processedMessages = Collections.synchronizedList(new 
ArrayList<>());
+        List<Message> unreadMessages = Collections.synchronizedList(new 
ArrayList<>());
+        List<Future<JobStatusResult>> futureList = new ArrayList<>();
+
+        // use thread pool to increase email processing
         for (Message message : searchMessages) {
-            try {
-                JobStatusResult jobStatusResult = parse(message);
-                JobExecutionContext jEC = 
jobMonitorMap.get(jobStatusResult.getJobId());
-                if (jEC == null) {
-                    jEC = jobMonitorMap.get(jobStatusResult.getJobName());
-                }
-                if (jEC != null) {
-                    process(jobStatusResult, jEC);
-                    processedMessages.add(message);
-                } else {
-                    // we can get JobExecutionContext null in multiple Gfac 
instances environment,
-                    // where this job is not submitted by this Gfac instance 
hence we ignore this message.
-                    unreadMessages.add(message);
+            Future<JobStatusResult> jobStatusFuture = executor.submit(new 
Callable<JobStatusResult>() {
+                JobStatusResult jobStatusResult = null;
+                @Override
+                public JobStatusResult call() throws Exception {
+                    try {
+                        jobStatusResult = parse(message);
+                        JobExecutionContext jEC = 
jobMonitorMap.get(jobStatusResult.getJobId());
+                        if (jEC == null) {
+                            jEC = 
jobMonitorMap.get(jobStatusResult.getJobName());
+                        }
+                        if (jEC != null) {
+                            process(jobStatusResult, jEC);
+                            processedMessages.add(message);
+                        } else {
+                            // we can get JobExecutionContext null in multiple 
Gfac instances environment,
+                            // where this job is not submitted by this Gfac 
instance hence we ignore this message.
+                            unreadMessages.add(message);
 //                  log.info("JobExecutionContext is not found for job Id " + 
jobStatusResult.getJobId());
+                        }
+                    } catch (AiravataException e) {
+                        log.error("[EJM]: Error parsing email message 
=====================================>", e);
+                        try {
+                            writeEnvelopeOnError(message);
+                        } catch (MessagingException e1) {
+                            log.error("[EJM]: Error printing envelop of the 
email");
+                        }
+                        unreadMessages.add(message);
+                    } catch (MessagingException e) {
+                        log.error("[EJM]: Error while retrieving sender 
address from message : " + message.toString());
+                        unreadMessages.add(message);
+                    }
+                    return jobStatusResult;
                 }
-            } catch (AiravataException e) {
-                log.error("[EJM]: Error parsing email message 
=====================================>", e);
-                try {
-                    writeEnvelopeOnError(message);
-                } catch (MessagingException e1) {
-                    log.error("[EJM]: Error printing envelop of the email");
-                }
-                unreadMessages.add(message);
-            } catch (MessagingException e) {
-                log.error("[EJM]: Error while retrieving sender address from 
message : " + message.toString());
-                unreadMessages.add(message);
+            });
+            futureList.add(jobStatusFuture);
+        }
+        // wait until all thread returns
+        for (Future<JobStatusResult> jobStatusResultFuture : futureList) {
+            try {
+                jobStatusResultFuture.get();
+            } catch (InterruptedException e) {
+                log.error("Error while calling future.get() ", e);
+            } catch (ExecutionException e) {
+                log.error("Error while calling future.get()", e);
             }
         }
+
         if (!processedMessages.isEmpty()) {
             Message[] seenMessages = new Message[processedMessages.size()];
             processedMessages.toArray(seenMessages);

Reply via email to