Author: jstrachan
Date: Mon Apr 30 10:46:11 2007
New Revision: 533814
URL: http://svn.apache.org/viewvc?view=rev&rev=533814
Log:
completed the test case demonstrating the use of consuming of messages from a
folder as well as sending
Modified:
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
Modified:
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
---
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
(original)
+++
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/JavaMailConnection.java
Mon Apr 30 10:46:11 2007
@@ -25,6 +25,8 @@
import javax.mail.AuthenticationFailedException;
import javax.mail.MessagingException;
import javax.mail.Transport;
+import javax.mail.Folder;
+import javax.mail.Store;
/**
* An extension of Spring's [EMAIL PROTECTED] JavaMailSenderImpl} to provide
helper methods for listening for new mail
@@ -33,23 +35,14 @@
*/
public class JavaMailConnection extends JavaMailSenderImpl {
- /**
- * Create a new [EMAIL PROTECTED] Transport} which can then be used to
consume new messages
- *
- * @throws MailAuthenticationException in case of authentication failure
- * @throws MailSendException in case of failure when sending a
message
- */
- public Transport createTransport() throws MailException {
+ public Folder getFolder(String protocol, String folderName) {
try {
- Transport transport = getTransport(getSession());
- transport.connect(getHost(), getPort(), getUsername(),
getPassword());
- return transport;
+ Store store = getSession().getStore(protocol);
+ store.connect(getHost(), getPort(), getUsername(), getPassword());
+ return store.getFolder(folderName);
}
- catch (AuthenticationFailedException ex) {
- throw new MailAuthenticationException(ex);
- }
- catch (MessagingException ex) {
- throw new MailSendException("Mail server connection failed", ex);
+ catch (MessagingException e) {
+ throw new MailSendException("Mail server connection failed", e);
}
}
}
Modified:
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
---
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
(original)
+++
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
Mon Apr 30 10:46:11 2007
@@ -39,6 +39,8 @@
private int port = -1;
private String destination;
private String from = "[EMAIL PROTECTED]";
+ private boolean deleteProcessedMessages = true;
+ private String folderName = "INBOX";
public MailConfiguration() {
}
@@ -83,6 +85,9 @@
if (fragment == null || fragment.length() == 0) {
fragment = userInfo + "@" + host;
}
+ else {
+ setFolderName(fragment);
+ }
setDestination(fragment);
}
@@ -197,5 +202,21 @@
public void setFrom(String from) {
this.from = from;
+ }
+
+ public boolean isDeleteProcessedMessages() {
+ return deleteProcessedMessages;
+ }
+
+ public void setDeleteProcessedMessages(boolean deleteProcessedMessages) {
+ this.deleteProcessedMessages = deleteProcessedMessages;
+ }
+
+ public String getFolderName() {
+ return folderName;
+ }
+
+ public void setFolderName(String folderName) {
+ this.folderName = folderName;
}
}
Modified:
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
---
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
(original)
+++
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
Mon Apr 30 10:46:11 2007
@@ -19,12 +19,18 @@
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.PollingConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import javax.mail.Flags;
+import javax.mail.Folder;
import javax.mail.Message;
+import javax.mail.MessagingException;
import javax.mail.Transport;
-import javax.mail.event.TransportEvent;
-import javax.mail.event.TransportListener;
+import javax.mail.event.MessageCountEvent;
+import javax.mail.event.MessageCountListener;
+import java.util.concurrent.ScheduledExecutorService;
/**
* A [EMAIL PROTECTED] Consumer} which consumes messages from JavaMail using a
[EMAIL PROTECTED] Transport} and dispatches them
@@ -32,37 +38,96 @@
*
* @version $Revision: 523430 $
*/
-public class MailConsumer extends DefaultConsumer<MailExchange> implements
TransportListener {
+public class MailConsumer extends PollingConsumer<MailExchange> implements
MessageCountListener {
+ private static final transient Log log =
LogFactory.getLog(MailConsumer.class);
private final MailEndpoint endpoint;
- private final Transport transport;
+ private final Folder folder;
- public MailConsumer(MailEndpoint endpoint, Processor<MailExchange>
processor, Transport transport) {
- super(endpoint, processor);
+ public MailConsumer(MailEndpoint endpoint, Processor<MailExchange>
processor, Folder folder) {
+ super(endpoint, processor, endpoint.getExecutorService());
this.endpoint = endpoint;
- this.transport = transport;
+ this.folder = folder;
}
@Override
protected void doStart() throws Exception {
super.doStart();
- transport.addTransportListener(this);
+ ensureFolderIsOpen();
+ folder.addMessageCountListener(this);
}
@Override
protected void doStop() throws Exception {
- transport.close();
+ folder.removeMessageCountListener(this);
+ folder.close(true);
super.doStop();
}
- public void messageDelivered(TransportEvent transportEvent) {
- Message message = transportEvent.getMessage();
+ public void messagesAdded(MessageCountEvent event) {
+ Message[] messages = event.getMessages();
+ for (Message message : messages) {
+ try {
+ if (!message.getFlags().contains(Flags.Flag.DELETED)) {
+ processMessage(message);
+
+ flagMessageDeleted(message);
+ }
+ }
+ catch (MessagingException e) {
+ handleException(e);
+ }
+ }
+ }
+
+ public void messagesRemoved(MessageCountEvent event) {
+ Message[] messages = event.getMessages();
+ for (Message message : messages) {
+ if (log.isDebugEnabled()) {
+ try {
+ log.debug("Removing message: " + message.getSubject());
+ }
+ catch (MessagingException e) {
+ log.debug("Ignored: " + e);
+ }
+ }
+ }
+ }
+
+ protected void poll() throws Exception {
+ ensureFolderIsOpen();
+
+ int count = folder.getMessageCount();
+ if (count > 0) {
+ Message message = folder.getMessage(1);
+
+ processMessage(message);
+
+ flagMessageDeleted(message);
+ }
+ else if (count == -1) {
+ throw new MessagingException("Folder: " + folder.getFullName() + "
is closed");
+ }
+
+ folder.close(true);
+ }
+
+ protected void processMessage(Message message) {
MailExchange exchange = endpoint.createExchange(message);
getProcessor().process(exchange);
}
- public void messageNotDelivered(TransportEvent transportEvent) {
+ protected void ensureFolderIsOpen() throws MessagingException {
+ if (!folder.isOpen()) {
+ folder.open(Folder.READ_WRITE);
+ }
}
- public void messagePartiallyDelivered(TransportEvent transportEvent) {
+ protected void flagMessageDeleted(Message message) throws
MessagingException {
+ if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
+ message.setFlag(Flags.Flag.DELETED, true);
+ }
+ else {
+ message.setFlag(Flags.Flag.SEEN, true);
+ }
}
}
Modified:
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
---
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
(original)
+++
activemq/camel/trunk/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
Mon Apr 30 10:46:11 2007
@@ -24,7 +24,7 @@
import org.springframework.mail.javamail.JavaMailSender;
import javax.mail.Message;
-import javax.mail.Transport;
+import javax.mail.Folder;
/**
* @version $Revision:520964 $
@@ -52,19 +52,28 @@
public Consumer<MailExchange> createConsumer(Processor<MailExchange>
processor) throws Exception {
JavaMailConnection connection =
configuration.createJavaMailConnection(this);
- return createConsumer(processor, connection.createTransport());
+ String protocol = getConfiguration().getProtocol();
+ if (protocol.equals("smtp")) {
+ protocol = "pop3";
+ }
+ String folderName = getConfiguration().getFolderName();
+ Folder folder = connection.getFolder(protocol, folderName);
+ if (folder == null) {
+ throw new IllegalArgumentException("No folder for protocol: " +
protocol + " and name: " + folderName);
+ }
+ return createConsumer(processor, folder);
}
/**
* Creates a consumer using the given processor and transport
*
* @param processor the processor to use to process the messages
- * @param transport the JavaMail transport to use for inbound messages
+ * @param folder the JavaMail Folder to use for inbound messages
* @return a newly created consumer
* @throws Exception if the consumer cannot be created
*/
- public Consumer<MailExchange> createConsumer(Processor<MailExchange>
processor, Transport transport) throws Exception {
- return startService(new MailConsumer(this, processor, transport));
+ public Consumer<MailExchange> createConsumer(Processor<MailExchange>
processor, Folder folder) throws Exception {
+ return startService(new MailConsumer(this, processor, folder));
}
public MailExchange createExchange() {
Modified:
activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java?view=diff&rev=533814&r1=533813&r2=533814
==============================================================================
---
activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
(original)
+++
activemq/camel/trunk/camel-mail/src/test/java/org/apache/camel/component/mail/MailRouteTest.java
Mon Apr 30 10:46:11 2007
@@ -20,6 +20,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
import static org.apache.camel.builder.Builder.constant;
import org.apache.camel.builder.RouteBuilder;
import static org.apache.camel.util.ObjectHelper.asString;
@@ -34,7 +35,12 @@
* @version $Revision: 1.1 $
*/
public class MailRouteTest extends ContextTestSupport {
+ private MockEndpoint resultEndpoint;
+
public void testSendAndReceiveMails() throws Exception {
+ resultEndpoint = (MockEndpoint)
resolveMandatoryEndpoint("mock:result");
+ resultEndpoint.expectedMessageCount(1);
+
client.send("smtp://[EMAIL PROTECTED]", new Processor<Exchange>() {
public void process(Exchange exchange) {
exchange.getIn().setBody("hello world!");
@@ -45,8 +51,9 @@
assertMailboxReceivedMessages("[EMAIL PROTECTED]");
// lets test the receive worked
- // TODO
- // assertMailboxReceivedMessages("[EMAIL PROTECTED]");
+ resultEndpoint.assertIsSatisfied(5000);
+
+ assertMailboxReceivedMessages("[EMAIL PROTECTED]");
}
protected void assertMailboxReceivedMessages(String name) throws
IOException, MessagingException {
@@ -62,8 +69,9 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("smtp://[EMAIL PROTECTED]").to("direct:a");
- from("direct:a").setHeader("name",
constant("James")).to("pop3:[EMAIL PROTECTED]");
+ from("smtp://[EMAIL PROTECTED]").to("queue:a");
+ from("queue:a").to("smtp://[EMAIL PROTECTED]", "smtp://[EMAIL
PROTECTED]");
+ from("smtp://[EMAIL PROTECTED]").to("mock:result");
}
};
}