Author: asankha
Date: Thu Apr  2 04:58:21 2009
New Revision: 761168

URL: http://svn.apache.org/viewvc?rev=761168&view=rev
Log:
Fix WSCOMMONS-454 fully as per comment by Andreas, that waiting for sub-task 
completion could lead to a deadlock. Now the main task threads do not wait for 
sub tasks that process messages to complete, but the last sub task that 
complets will perform completion tasks, and also request the 
AbstractTransportListener to schedule the next run - if appropriate

Modified:
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
    
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
 Thu Apr  2 04:58:21 2009
@@ -47,7 +47,8 @@
         super.init(cfgCtx, transportIn);
         T entry = createPollTableEntry(transportIn);
         if (entry != null) {
-            schedulePoll(entry, getPollInterval(transportIn));
+            entry.setPollInterval(getPollInterval(transportIn));
+            schedulePoll(entry);
             pollTable.add(entry);
         }
     }
@@ -82,7 +83,8 @@
      * @param entry the poll table entry with the configuration for the service
      * @param pollInterval the interval between successive polls in 
milliseconds
      */
-    void schedulePoll(final T entry, final long pollInterval) {
+    void schedulePoll(final T entry) {
+        final long pollInterval = entry.getPollInterval();
         TimerTask timerTask = new TimerTask() {
             @Override
             public void run() {
@@ -96,14 +98,6 @@
                         } else {
                             poll(entry);
                         }
-
-                        if (!entry.isConcurrentPollingAllowed()) {
-                            synchronized (entry) {
-                                if (!entry.canceled) {
-                                    schedulePoll(entry, pollInterval);
-                                }
-                            }
-                        }
                     }
                 });
             }
@@ -123,9 +117,19 @@
         }
         pollTable.remove(entry);
     }
-    
+
     protected abstract void poll(T entry);
 
+    protected void onPollCompletion(T entry) {
+        if (!entry.isConcurrentPollingAllowed()) {
+            synchronized (entry) {
+                if (!entry.canceled) {
+                    schedulePoll(entry);
+                }
+            }
+        }
+    }
+
     /**
      * method to log a failure to the log file and to update the last poll 
status and time
      * @param msg text for the log message
@@ -142,6 +146,7 @@
         entry.setLastPollState(AbstractPollTableEntry.FAILED);
         entry.setLastPollTime(now);
         entry.setNextPollTime(now + entry.getPollInterval());
+        onPollCompletion(entry);
     }
 
     private long getPollInterval(ParameterInclude params) {
@@ -173,7 +178,8 @@
             throw new AxisFault("The service has no configuration for the 
transport");
         }
         entry.setService(service);
-        schedulePoll(entry, getPollInterval(service));
+        entry.setPollInterval(getPollInterval(service));
+        schedulePoll(entry);
         pollTable.add(entry);
     }
     

Modified: 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 Thu Apr  2 04:58:21 2009
@@ -117,6 +117,8 @@
         long reconnectionTimeout = entry.getReconnectTimeout();
         Session session = entry.getSession();
         Store store = null;
+        Folder folder = null;
+        boolean mailProcessingStarted = false;
 
         while (!connected) {
             try {
@@ -137,6 +139,17 @@
                 // were we able to connect?
                 connected = store.isConnected();
 
+                if (connected) {
+                    if (entry.getFolder() != null) {
+                        folder = store.getFolder(entry.getFolder());
+                    } else {
+                        folder = store.getFolder(MailConstants.DEFAULT_FOLDER);
+                    }
+                    if (folder == null) {
+                        folder = store.getDefaultFolder();
+                    }
+                }
+
             } catch (Exception e) {
                 log.error("Error connecting to mail server for address : " + 
emailAddress, e);
                 if (maxRetryCount <= retryCount) {
@@ -156,117 +169,78 @@
             }
         }
 
-        if (connected) {
-            Folder folder = null;
+        if (connected && folder != null) {
+
             CountDownLatch latch = null;
+            Runnable onCompletion = new MailCheckCompletionTask(folder, store, 
emailAddress, entry);
 
             try {
-
-                if (entry.getFolder() != null) {
-                    folder = store.getFolder(entry.getFolder());
-                } else {
-                    folder = store.getFolder(MailConstants.DEFAULT_FOLDER);
-                }
-                if (folder == null) {
-                    folder = store.getDefaultFolder();
+                if (log.isDebugEnabled()) {
+                    log.debug("Connecting to folder : " + folder.getName() +
+                        " of email account : " + emailAddress);
                 }
 
-                if (folder == null) {
-                    processFailure("Unable to access mail folder", null, 
entry);
-
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Connecting to folder : " + folder.getName() 
+
-                            " of email account : " + emailAddress);
-                    }
-
-                    folder.open(Folder.READ_WRITE);
-                    int total = folder.getMessageCount();
-                    Message[] messages = folder.getMessages();
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(messages.length + " messgaes in folder : " + 
folder);
-                    }
+                folder.open(Folder.READ_WRITE);
+                int total = folder.getMessageCount();
+                Message[] messages = folder.getMessages();
 
-                    latch = new CountDownLatch(total);
-                    for (int i = 0; i < total; i++) {
+                if (log.isDebugEnabled()) {
+                    log.debug(messages.length + " messgaes in folder : " + 
folder);
+                }
 
-                        try {
-                            String[] status = messages[i].getHeader("Status");
-                            if (status != null && status.length == 1 && 
status[0].equals("RO")) {
-                                // some times the mail server sends a special 
mail message which is
-                                // not relavent in processing. ignore this 
message.
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
-                                        + " : " + messages[i].getSubject() + " 
- Status: RO");
-                                }
-                                latch.countDown();
-                            } else if (messages[i].isSet(Flags.Flag.SEEN)) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
-                                        + " : " + messages[i].getSubject() + " 
- already marked SEEN");
-                                }
-                                latch.countDown();
-                            } else if (messages[i].isSet(Flags.Flag.DELETED)) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
-                                        + " : " +  messages[i].getSubject() + 
" - already marked DELETED");
-                                }
-                                latch.countDown();
+                latch = new CountDownLatch(total);
+                for (int i = 0; i < total; i++) {
 
-                            } else {
-                                processMail(entry, folder, store, messages[i], 
latch);
+                    try {
+                        String[] status = messages[i].getHeader("Status");
+                        if (status != null && status.length == 1 && 
status[0].equals("RO")) {
+                            // some times the mail server sends a special mail 
message which is
+                            // not relavent in processing. ignore this message.
+                            if (log.isDebugEnabled()) {
+                                log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                    + " : " + messages[i].getSubject() + " - 
Status: RO");
+                            }
+                            latch.countDown();
+                        } else if (messages[i].isSet(Flags.Flag.SEEN)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                    + " : " + messages[i].getSubject() + " - 
already marked SEEN");
                             }
-                        } catch (MessageRemovedException ignore) {
-                            // while reading the meta information, this mail 
was deleted, thats ok
+                            latch.countDown();
+                        } else if (messages[i].isSet(Flags.Flag.DELETED)) {
                             if (log.isDebugEnabled()) {
-                                log.debug("Skipping message # : " + 
messages[i].getMessageNumber() +
-                                    " as it has been DELETED by another thread 
after processing");
+                                log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                    + " : " +  messages[i].getSubject() + " - 
already marked DELETED");
                             }
                             latch.countDown();
+
+                        } else {
+                            processMail(entry, folder, store, messages[i], 
latch, onCompletion);
+                            mailProcessingStarted = true;
                         }
+                    } catch (MessageRemovedException ignore) {
+                        // while reading the meta information, this mail was 
deleted, thats ok
+                        if (log.isDebugEnabled()) {
+                            log.debug("Skipping message # : " + 
messages[i].getMessageNumber() +
+                                " as it has been DELETED by another thread 
after processing");
+                        }
+                        latch.countDown();
                     }
                 }
 
+                if (!mailProcessingStarted) {
+                    // if we didnt process any mail in this run, the 
onCompletion will not
+                    // run from the mail processor by default
+                    onCompletion.run();
+                }
+
             } catch (MessagingException me) {
                 processFailure("Error checking mail for account : " +
                     emailAddress + " :: " + me.getMessage(), me, entry);
-
-            } finally {
-
-                // wait till all parallel message processing tasks complete
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Awaiting completion of " + latch.getCount() 
+
-                            " concurrent message processing threads");
-                    }
-                    latch.await();
-                } catch (InterruptedException e) {
-                    log.warn("Mail transport listner polling thread 
interrupted before completion");
-                }
-
-                try {
-                    folder.close(true /** expunge messages flagged as 
DELETED*/);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Mail folder closed, and deleted mail 
expunged");
-                    }
-                } catch (MessagingException e) {
-                    processFailure("Error closing mail folder : " +
-                        folder + " for account : " + emailAddress, e, entry);
-                }
-
-                if (store != null) {
-                    try {
-                        store.close();
-                        if (log.isDebugEnabled()) {
-                            log.debug("Mail store closed for : " + 
emailAddress);
-                        }
-                    } catch (MessagingException e) {
-                        log.warn("Error closing mail store for account : " +
-                            emailAddress + " :: " + e.getMessage(), e);
-                    }
-                }
             }
+
+        } else {
+            processFailure("Unable to access mail folder", null, entry);
         }
     }
 
@@ -279,11 +253,12 @@
      * @param pos the message position seen initially
      * @param mp the MailProcessor object
      * @param latch the completion latch to notify
+     * @param onCompletion the tasks to run on the completion of mail 
processing
      */
     private void processMail(PollTableEntry entry, Folder folder, Store store, 
Message message,
-                             CountDownLatch latch) {
+                             CountDownLatch latch, Runnable onCompletion) {
 
-        MailProcessor mp = new MailProcessor(entry, message, store, folder, 
latch);
+        MailProcessor mp = new MailProcessor(entry, message, store, folder, 
latch, onCompletion);
 
         // should messages be processed in parallel?
         if (entry.isConcurrentPollingAllowed()) {
@@ -352,13 +327,16 @@
         private Folder folder = null;
         private String uid = null;
         private CountDownLatch doneSignal = null;
+        private Runnable onCompletion = null;
 
-        MailProcessor(PollTableEntry entry, Message message, Store store, 
Folder folder, CountDownLatch doneSignal) {
+        MailProcessor(PollTableEntry entry, Message message, Store store, 
Folder folder,
+                      CountDownLatch doneSignal, Runnable onCompletion) {
             this.entry = entry;
             this.message = message;
             this.store = store;
             this.folder = folder;
             this.doneSignal = doneSignal;
+            this.onCompletion = onCompletion;
         }
 
         public void setUID(String uid) {
@@ -392,6 +370,73 @@
             }
 
             doneSignal.countDown();
+
+            if (doneSignal.getCount() == 0) {
+                onCompletion.run();
+            }
+        }
+    }
+
+    /**
+     * Handle optional logic of the mail transport, that needs to happen once 
all messages in
+     * a check mail cycle has ended.
+     */
+    private class MailCheckCompletionTask implements Runnable {
+        private final Folder folder;
+        private final Store store;
+        private final InternetAddress emailAddress;
+        private final PollTableEntry entry;
+        private boolean taskStarted = false;
+
+        public MailCheckCompletionTask(Folder folder, Store store,
+                                       InternetAddress emailAddress, 
PollTableEntry entry) {
+            this.folder = folder;
+            this.store = store;
+            this.emailAddress = emailAddress;
+            this.entry = entry;
+        }
+
+        public void run() {
+            synchronized(this) {
+                if (taskStarted) {
+                    return;
+                } else {
+                    taskStarted = true;
+                }
+            }
+            
+            if (log.isDebugEnabled()) {
+                log.debug("Executing onCompletion task for the mail download 
of : " + emailAddress);
+            }
+
+            if (folder != null) {
+                try {
+                    folder.close(true /** expunge messages flagged as 
DELETED*/);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Mail folder closed, and deleted mail 
expunged");
+                    }
+                } catch (MessagingException e) {
+                    log.warn("Error closing mail folder : " +
+                        folder + " for account : " + emailAddress + " :: "+ 
e.getMessage());
+                }
+            }
+
+            if (store != null) {
+                try {
+                    store.close();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Mail store closed for : " + emailAddress);
+                    }
+                } catch (MessagingException e) {
+                    log.warn("Error closing mail store for account : " +
+                        emailAddress + " :: " + e.getMessage(), e);
+                }
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Scheduling next poll for : " + emailAddress);
+            }
+            onPollCompletion(entry);
         }
     }
 
@@ -769,12 +814,18 @@
                 paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL);
             if (processInParallel != null) {
                 
entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel));
+                if (log.isDebugEnabled() && 
entry.isProcessingMailInParallel()) {
+                    log.debug("Parallel mail processing enabled for : " + 
address);
+                }
             }
 
             String pollInParallel = ParamUtils.getOptionalParam(
                 paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL);
             if (pollInParallel != null) {
                 
entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel));
+                if (log.isDebugEnabled() && 
entry.isConcurrentPollingAllowed()) {
+                    log.debug("Concurrent mail polling enabled for : " + 
address);
+                }
             }
 
             String strMaxRetryCount = ParamUtils.getOptionalParam(


Reply via email to