Author: asankha Date: Thu Apr 2 04:58:21 2009 New Revision: 761168 URL: http://svn.apache.org/viewvc?rev=761168&view=rev Log: Fix WSCOMMONS-454 fully as per comment by Andreas, that waiting for sub-task completion could lead to a deadlock. Now the main task threads do not wait for sub tasks that process messages to complete, but the last sub task that complets will perform completion tasks, and also request the AbstractTransportListener to schedule the next run - if appropriate
Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java Thu Apr 2 04:58:21 2009 @@ -47,7 +47,8 @@ super.init(cfgCtx, transportIn); T entry = createPollTableEntry(transportIn); if (entry != null) { - schedulePoll(entry, getPollInterval(transportIn)); + entry.setPollInterval(getPollInterval(transportIn)); + schedulePoll(entry); pollTable.add(entry); } } @@ -82,7 +83,8 @@ * @param entry the poll table entry with the configuration for the service * @param pollInterval the interval between successive polls in milliseconds */ - void schedulePoll(final T entry, final long pollInterval) { + void schedulePoll(final T entry) { + final long pollInterval = entry.getPollInterval(); TimerTask timerTask = new TimerTask() { @Override public void run() { @@ -96,14 +98,6 @@ } else { poll(entry); } - - if (!entry.isConcurrentPollingAllowed()) { - synchronized (entry) { - if (!entry.canceled) { - schedulePoll(entry, pollInterval); - } - } - } } }); } @@ -123,9 +117,19 @@ } pollTable.remove(entry); } - + protected abstract void poll(T entry); + protected void onPollCompletion(T entry) { + if (!entry.isConcurrentPollingAllowed()) { + synchronized (entry) { + if (!entry.canceled) { + schedulePoll(entry); + } + } + } + } + /** * method to log a failure to the log file and to update the last poll status and time * @param msg text for the log message @@ -142,6 +146,7 @@ entry.setLastPollState(AbstractPollTableEntry.FAILED); entry.setLastPollTime(now); entry.setNextPollTime(now + entry.getPollInterval()); + onPollCompletion(entry); } private long getPollInterval(ParameterInclude params) { @@ -173,7 +178,8 @@ throw new AxisFault("The service has no configuration for the transport"); } entry.setService(service); - schedulePoll(entry, getPollInterval(service)); + entry.setPollInterval(getPollInterval(service)); + schedulePoll(entry); pollTable.add(entry); } Modified: webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java Thu Apr 2 04:58:21 2009 @@ -117,6 +117,8 @@ long reconnectionTimeout = entry.getReconnectTimeout(); Session session = entry.getSession(); Store store = null; + Folder folder = null; + boolean mailProcessingStarted = false; while (!connected) { try { @@ -137,6 +139,17 @@ // were we able to connect? connected = store.isConnected(); + if (connected) { + if (entry.getFolder() != null) { + folder = store.getFolder(entry.getFolder()); + } else { + folder = store.getFolder(MailConstants.DEFAULT_FOLDER); + } + if (folder == null) { + folder = store.getDefaultFolder(); + } + } + } catch (Exception e) { log.error("Error connecting to mail server for address : " + emailAddress, e); if (maxRetryCount <= retryCount) { @@ -156,117 +169,78 @@ } } - if (connected) { - Folder folder = null; + if (connected && folder != null) { + CountDownLatch latch = null; + Runnable onCompletion = new MailCheckCompletionTask(folder, store, emailAddress, entry); try { - - if (entry.getFolder() != null) { - folder = store.getFolder(entry.getFolder()); - } else { - folder = store.getFolder(MailConstants.DEFAULT_FOLDER); - } - if (folder == null) { - folder = store.getDefaultFolder(); + if (log.isDebugEnabled()) { + log.debug("Connecting to folder : " + folder.getName() + + " of email account : " + emailAddress); } - if (folder == null) { - processFailure("Unable to access mail folder", null, entry); - - } else { - if (log.isDebugEnabled()) { - log.debug("Connecting to folder : " + folder.getName() + - " of email account : " + emailAddress); - } - - folder.open(Folder.READ_WRITE); - int total = folder.getMessageCount(); - Message[] messages = folder.getMessages(); - - if (log.isDebugEnabled()) { - log.debug(messages.length + " messgaes in folder : " + folder); - } + folder.open(Folder.READ_WRITE); + int total = folder.getMessageCount(); + Message[] messages = folder.getMessages(); - latch = new CountDownLatch(total); - for (int i = 0; i < total; i++) { + if (log.isDebugEnabled()) { + log.debug(messages.length + " messgaes in folder : " + folder); + } - try { - String[] status = messages[i].getHeader("Status"); - if (status != null && status.length == 1 && status[0].equals("RO")) { - // some times the mail server sends a special mail message which is - // not relavent in processing. ignore this message. - if (log.isDebugEnabled()) { - log.debug("Skipping message # : " + messages[i].getMessageNumber() - + " : " + messages[i].getSubject() + " - Status: RO"); - } - latch.countDown(); - } else if (messages[i].isSet(Flags.Flag.SEEN)) { - if (log.isDebugEnabled()) { - log.debug("Skipping message # : " + messages[i].getMessageNumber() - + " : " + messages[i].getSubject() + " - already marked SEEN"); - } - latch.countDown(); - } else if (messages[i].isSet(Flags.Flag.DELETED)) { - if (log.isDebugEnabled()) { - log.debug("Skipping message # : " + messages[i].getMessageNumber() - + " : " + messages[i].getSubject() + " - already marked DELETED"); - } - latch.countDown(); + latch = new CountDownLatch(total); + for (int i = 0; i < total; i++) { - } else { - processMail(entry, folder, store, messages[i], latch); + try { + String[] status = messages[i].getHeader("Status"); + if (status != null && status.length == 1 && status[0].equals("RO")) { + // some times the mail server sends a special mail message which is + // not relavent in processing. ignore this message. + if (log.isDebugEnabled()) { + log.debug("Skipping message # : " + messages[i].getMessageNumber() + + " : " + messages[i].getSubject() + " - Status: RO"); + } + latch.countDown(); + } else if (messages[i].isSet(Flags.Flag.SEEN)) { + if (log.isDebugEnabled()) { + log.debug("Skipping message # : " + messages[i].getMessageNumber() + + " : " + messages[i].getSubject() + " - already marked SEEN"); } - } catch (MessageRemovedException ignore) { - // while reading the meta information, this mail was deleted, thats ok + latch.countDown(); + } else if (messages[i].isSet(Flags.Flag.DELETED)) { if (log.isDebugEnabled()) { - log.debug("Skipping message # : " + messages[i].getMessageNumber() + - " as it has been DELETED by another thread after processing"); + log.debug("Skipping message # : " + messages[i].getMessageNumber() + + " : " + messages[i].getSubject() + " - already marked DELETED"); } latch.countDown(); + + } else { + processMail(entry, folder, store, messages[i], latch, onCompletion); + mailProcessingStarted = true; } + } catch (MessageRemovedException ignore) { + // while reading the meta information, this mail was deleted, thats ok + if (log.isDebugEnabled()) { + log.debug("Skipping message # : " + messages[i].getMessageNumber() + + " as it has been DELETED by another thread after processing"); + } + latch.countDown(); } } + if (!mailProcessingStarted) { + // if we didnt process any mail in this run, the onCompletion will not + // run from the mail processor by default + onCompletion.run(); + } + } catch (MessagingException me) { processFailure("Error checking mail for account : " + emailAddress + " :: " + me.getMessage(), me, entry); - - } finally { - - // wait till all parallel message processing tasks complete - try { - if (log.isDebugEnabled()) { - log.debug("Awaiting completion of " + latch.getCount() + - " concurrent message processing threads"); - } - latch.await(); - } catch (InterruptedException e) { - log.warn("Mail transport listner polling thread interrupted before completion"); - } - - try { - folder.close(true /** expunge messages flagged as DELETED*/); - if (log.isDebugEnabled()) { - log.debug("Mail folder closed, and deleted mail expunged"); - } - } catch (MessagingException e) { - processFailure("Error closing mail folder : " + - folder + " for account : " + emailAddress, e, entry); - } - - if (store != null) { - try { - store.close(); - if (log.isDebugEnabled()) { - log.debug("Mail store closed for : " + emailAddress); - } - } catch (MessagingException e) { - log.warn("Error closing mail store for account : " + - emailAddress + " :: " + e.getMessage(), e); - } - } } + + } else { + processFailure("Unable to access mail folder", null, entry); } } @@ -279,11 +253,12 @@ * @param pos the message position seen initially * @param mp the MailProcessor object * @param latch the completion latch to notify + * @param onCompletion the tasks to run on the completion of mail processing */ private void processMail(PollTableEntry entry, Folder folder, Store store, Message message, - CountDownLatch latch) { + CountDownLatch latch, Runnable onCompletion) { - MailProcessor mp = new MailProcessor(entry, message, store, folder, latch); + MailProcessor mp = new MailProcessor(entry, message, store, folder, latch, onCompletion); // should messages be processed in parallel? if (entry.isConcurrentPollingAllowed()) { @@ -352,13 +327,16 @@ private Folder folder = null; private String uid = null; private CountDownLatch doneSignal = null; + private Runnable onCompletion = null; - MailProcessor(PollTableEntry entry, Message message, Store store, Folder folder, CountDownLatch doneSignal) { + MailProcessor(PollTableEntry entry, Message message, Store store, Folder folder, + CountDownLatch doneSignal, Runnable onCompletion) { this.entry = entry; this.message = message; this.store = store; this.folder = folder; this.doneSignal = doneSignal; + this.onCompletion = onCompletion; } public void setUID(String uid) { @@ -392,6 +370,73 @@ } doneSignal.countDown(); + + if (doneSignal.getCount() == 0) { + onCompletion.run(); + } + } + } + + /** + * Handle optional logic of the mail transport, that needs to happen once all messages in + * a check mail cycle has ended. + */ + private class MailCheckCompletionTask implements Runnable { + private final Folder folder; + private final Store store; + private final InternetAddress emailAddress; + private final PollTableEntry entry; + private boolean taskStarted = false; + + public MailCheckCompletionTask(Folder folder, Store store, + InternetAddress emailAddress, PollTableEntry entry) { + this.folder = folder; + this.store = store; + this.emailAddress = emailAddress; + this.entry = entry; + } + + public void run() { + synchronized(this) { + if (taskStarted) { + return; + } else { + taskStarted = true; + } + } + + if (log.isDebugEnabled()) { + log.debug("Executing onCompletion task for the mail download of : " + emailAddress); + } + + if (folder != null) { + try { + folder.close(true /** expunge messages flagged as DELETED*/); + if (log.isDebugEnabled()) { + log.debug("Mail folder closed, and deleted mail expunged"); + } + } catch (MessagingException e) { + log.warn("Error closing mail folder : " + + folder + " for account : " + emailAddress + " :: "+ e.getMessage()); + } + } + + if (store != null) { + try { + store.close(); + if (log.isDebugEnabled()) { + log.debug("Mail store closed for : " + emailAddress); + } + } catch (MessagingException e) { + log.warn("Error closing mail store for account : " + + emailAddress + " :: " + e.getMessage(), e); + } + } + + if (log.isDebugEnabled()) { + log.debug("Scheduling next poll for : " + emailAddress); + } + onPollCompletion(entry); } } @@ -769,12 +814,18 @@ paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL); if (processInParallel != null) { entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel)); + if (log.isDebugEnabled() && entry.isProcessingMailInParallel()) { + log.debug("Parallel mail processing enabled for : " + address); + } } String pollInParallel = ParamUtils.getOptionalParam( paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL); if (pollInParallel != null) { entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel)); + if (log.isDebugEnabled() && entry.isConcurrentPollingAllowed()) { + log.debug("Concurrent mail polling enabled for : " + address); + } } String strMaxRetryCount = ParamUtils.getOptionalParam(