Author: norman
Date: Wed Oct 6 12:15:05 2010
New Revision: 1005005
URL: http://svn.apache.org/viewvc?rev=1005005&view=rev
Log:
* Use amq: namespace to configure it via spring, so no need to activemq.xml.
* Some more fine tuning to make sure that JamesSpoolManager is really
thread-safe on shutdown
Removed:
james/server/trunk/spring-deployment/src/main/config/james/activemq.xml
Modified:
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
Modified:
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
---
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
(original)
+++
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
Wed Oct 6 12:15:05 2010
@@ -25,10 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.mail.MessagingException;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -60,7 +63,7 @@ import org.apache.mailet.base.MatcherInv
* It also offer the {...@link MailProcessorList} implementation which allow
to inject {...@link Mail} into the routes
*
*/
-public class CamelMailProcessorList extends RouteBuilder implements
Configurable, LogEnabled, MailProcessorList {
+public class CamelMailProcessorList implements Configurable, LogEnabled,
MailProcessorList, CamelContextAware {
private MatcherLoader matcherLoader;
private HierarchicalConfiguration config;
@@ -83,150 +86,16 @@ public class CamelMailProcessorList exte
}
private ProducerTemplate producerTemplate;
- /*
- * (non-Javadoc)
- * @see org.apache.camel.builder.RouteBuilder#configure()
- */
- @SuppressWarnings("unchecked")
- @Override
- public void configure() throws Exception {
- Processor terminatingMailetProcessor = new MailetProcessor(new
TerminatingMailet(), logger);
- Processor disposeProcessor = new DisposeProcessor();
- Processor mailProcessor = new MailCamelProcessor();
- Processor removePropsProcessor = new RemovePropertiesProcessor();
-
- List<HierarchicalConfiguration> processorConfs =
config.configurationsAt("processor");
- for (int i = 0; i< processorConfs.size(); i++) {
- final HierarchicalConfiguration processorConf =
processorConfs.get(i);
- String processorName = processorConf.getString("[...@name]");
-
-
- mailets.put(processorName, new ArrayList<Mailet>());
- matchers.put(processorName, new ArrayList<Matcher>());
-
- RouteDefinition processorDef =
from(getEndpoint(processorName)).inOnly()
- // store the logger in properties
- .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
-
-
- final List<HierarchicalConfiguration> mailetConfs =
processorConf.configurationsAt("mailet");
- // Loop through the mailet configuration, load
- // all of the matcher and mailets, and add
- // them to the processor.
- for (int j = 0; j< mailetConfs.size(); j++) {
- HierarchicalConfiguration c = mailetConfs.get(j);
-
- // We need to set this because of correctly parsing comma
- String mailetClassName = c.getString("[...@class]");
- String matcherName = c.getString("[...@match]", null);
- String invertedMatcherName = c.getString("[...@notmatch]",
null);
-
- Mailet mailet = null;
- Matcher matcher = null;
- try {
-
- if (matcherName != null&& invertedMatcherName != null) {
- // if no matcher is configured throw an Exception
- throw new ConfigurationException("Please configure only
match or nomatch per mailet");
- } else if (matcherName != null) {
- matcher = matcherLoader.getMatcher(matcherName);
- } else if (invertedMatcherName != null) {
- matcher = new
MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
-
- } else {
- // default matcher is All
- matcher = matcherLoader.getMatcher("All");
- }
-
- // The matcher itself should log that it's been inited.
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Matcher
").append(matcherName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to
init matcher ").append(matcherName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ",
ex.getNextException());
- }
- }
- System.err.println("Unable to init matcher " +
matcherName);
- System.err.println("Check spool manager logs for more
details.");
- // System.exit(1);
- throw new ConfigurationException("Unable to init matcher",
ex);
- }
- try {
- mailet = mailetLoader.getMailet(mailetClassName, c);
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Mailet
").append(mailetClassName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to
init mailet ").append(mailetClassName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ",
ex.getNextException());
- }
- }
- System.err.println("Unable to init mailet " +
mailetClassName);
- System.err.println("Check spool manager logs for more
details.");
- throw new ConfigurationException("Unable to init mailet",
ex);
- }
- if (mailet != null&& matcher != null) {
- String onMatchException = null;
- MailetConfig mailetConfig = mailet.getMailetConfig();
-
- if (mailetConfig instanceof MailetConfigImpl) {
- onMatchException = ((MailetConfigImpl)
mailetConfig).getInitAttribute("onMatchException");
- }
-
- // Store the matcher to use for splitter in properties
- processorDef
- .setProperty(MatcherSplitter.MATCHER_PROPERTY,
constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY,
constant(onMatchException))
-
- // do splitting of the mail based on the stored
matcher
-
.split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
-
- .choice().when(new MatcherMatch()).process(new
MailetProcessor(mailet, logger)).end()
-
- .choice().when(new
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
-
- .choice().when(new
MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
-
- // store mailet and matcher
- mailets.get(processorName).add(mailet);
- matchers.get(processorName).add(matcher);
- }
-
+ private CamelContext camelContext;
+
- }
-
- processorDef
- // start choice
- .choice()
-
- // when the mail state did not change till yet ( the end
of the route) we need to call the TerminatingMailet to
- // make sure we don't fall into a endless loop
- .when(new
MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
-
-
- // dispose when needed
- .when(new
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
-
- // route it to the next processor
- .otherwise().process(mailProcessor).stop();
-
- processors.put(processorName, new
ChildMailProcessor(processorName));
- }
-
- producerTemplate = getContext().createProducerTemplate();
- }
+ @PostConstruct
+ public void init() throws Exception {
+ getCamelContext().addRoutes(new SpoolRouteBuilder());
+ producerTemplate = getCamelContext().createProducerTemplate();
+ }
/**
* Destroy all mailets and matchers
*/
@@ -428,4 +297,156 @@ public class CamelMailProcessorList exte
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ private final class SpoolRouteBuilder extends RouteBuilder {
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.builder.RouteBuilder#configure()
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure() throws Exception {
+ Processor terminatingMailetProcessor = new MailetProcessor(new
TerminatingMailet(), logger);
+ Processor disposeProcessor = new DisposeProcessor();
+ Processor mailProcessor = new MailCamelProcessor();
+ Processor removePropsProcessor = new
RemovePropertiesProcessor();
+
+ List<HierarchicalConfiguration> processorConfs =
config.configurationsAt("processor");
+ for (int i = 0; i< processorConfs.size(); i++) {
+ final HierarchicalConfiguration processorConf =
processorConfs.get(i);
+ String processorName =
processorConf.getString("[...@name]");
+
+
+ mailets.put(processorName, new ArrayList<Mailet>());
+ matchers.put(processorName, new ArrayList<Matcher>());
+
+ RouteDefinition processorDef =
from(getEndpoint(processorName)).inOnly()
+ // store the logger in properties
+ .setProperty(MatcherSplitter.LOGGER_PROPERTY,
constant(logger));
+
+
+ final List<HierarchicalConfiguration> mailetConfs =
processorConf.configurationsAt("mailet");
+ // Loop through the mailet configuration, load
+ // all of the matcher and mailets, and add
+ // them to the processor.
+ for (int j = 0; j< mailetConfs.size(); j++) {
+ HierarchicalConfiguration c = mailetConfs.get(j);
+
+ // We need to set this because of correctly parsing
comma
+ String mailetClassName = c.getString("[...@class]");
+ String matcherName = c.getString("[...@match]", null);
+ String invertedMatcherName =
c.getString("[...@notmatch]", null);
+
+ Mailet mailet = null;
+ Matcher matcher = null;
+ try {
+
+ if (matcherName != null&& invertedMatcherName !=
null) {
+ // if no matcher is configured throw an
Exception
+ throw new ConfigurationException("Please configure
only match or nomatch per mailet");
+ } else if (matcherName != null) {
+ matcher = matcherLoader.getMatcher(matcherName);
+ } else if (invertedMatcherName != null) {
+ matcher = new
MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
+
+ } else {
+ // default matcher is All
+ matcher = matcherLoader.getMatcher("All");
+ }
+
+ // The matcher itself should log that it's been
inited.
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Matcher
").append(matcherName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new
StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(":
").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception:
", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init matcher " +
matcherName);
+ System.err.println("Check spool manager logs for more
details.");
+ // System.exit(1);
+ throw new ConfigurationException("Unable to init
matcher", ex);
+ }
+ try {
+ mailet = mailetLoader.getMailet(mailetClassName, c);
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Mailet
").append(mailetClassName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new
StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(":
").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception:
", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init mailet " +
mailetClassName);
+ System.err.println("Check spool manager logs for more
details.");
+ throw new ConfigurationException("Unable to init
mailet", ex);
+ }
+ if (mailet != null&& matcher != null) {
+ String onMatchException = null;
+ MailetConfig mailetConfig =
mailet.getMailetConfig();
+
+ if (mailetConfig instanceof MailetConfigImpl) {
+ onMatchException = ((MailetConfigImpl)
mailetConfig).getInitAttribute("onMatchException");
+ }
+
+ // Store the matcher to use for splitter in
properties
+ processorDef
+
.setProperty(MatcherSplitter.MATCHER_PROPERTY,
constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY,
constant(onMatchException))
+
+ // do splitting of the mail based on the
stored matcher
+
.split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
+
+ .choice().when(new
MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
+
+ .choice().when(new
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
+
+ .choice().when(new
MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
+
+ // store mailet and matcher
+ mailets.get(processorName).add(mailet);
+ matchers.get(processorName).add(matcher);
+ }
+
+
+ }
+
+ processorDef
+ // start choice
+ .choice()
+
+ // when the mail state did not change till yet (
the end of the route) we need to call the TerminatingMailet to
+ // make sure we don't fall into a endless loop
+ .when(new
MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
+
+
+ // dispose when needed
+ .when(new
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
+
+ // route it to the next processor
+ .otherwise().process(mailProcessor).stop();
+
+ processors.put(processorName, new
ChildMailProcessor(processorName));
+ }
+
+ }
+ }
+
}
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
---
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
(original)
+++
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
Wed Oct 6 12:15:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -73,7 +74,7 @@ public class JamesSpoolManager implement
/**
* Number of active threads
*/
- private int numActive;
+ private AtomicInteger numActive = new AtomicInteger(0);;
/**
* Spool threads are active
@@ -132,7 +133,6 @@ public class JamesSpoolManager implement
}
active.set(true);
- numActive = 0;
spoolThreads = new java.util.ArrayList<Thread>(numThreads);
for ( int i = 0 ; i< numThreads ; i++ ) {
Thread reader = new Thread(this, "Spool Thread #" + i);
@@ -153,8 +153,9 @@ public class JamesSpoolManager implement
logger.info("Spool=" + queue.getClass().getName());
}
- numActive++;
while(active.get()) {
+ numActive.incrementAndGet();
+
try {
queue.deQueue(new DequeueOperation() {
@@ -189,12 +190,14 @@ public class JamesSpoolManager implement
logger.error("Exception processing mail in
JamesSpoolManager.run "
+ e.getMessage(), e);
}
+ } finally {
+ numActive.decrementAndGet();
}
+
}
if (logger.isInfoEnabled()){
logger.info("Stop JamesSpoolManager: " +
Thread.currentThread().getName());
}
- numActive--;
}
/**
@@ -217,11 +220,13 @@ public class JamesSpoolManager implement
long stop = System.currentTimeMillis() + 60000;
// give the spooler threads one minute to terminate gracefully
- while (numActive != 0&& stop> System.currentTimeMillis()) {
+ /*
+ while (numActive.get() != 0&& stop> System.currentTimeMillis()) {
try {
Thread.sleep(1000);
} catch (Exception ignored) {}
}
+ */
logger.info("JamesSpoolManager thread shutdown completed.");
}
Modified:
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL:
http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
(original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
Wed Oct 6 12:15:05 2010
@@ -21,10 +21,12 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
+ xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
-
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
<!--
** JMX part ** to enable exposure of JMX, activate the following beans
@@ -124,31 +126,28 @@
<camel:camelContext id="jamesCamelContext" trace="false">
<camel:jmxAgent id="agent" disabled="true"/>
<camel:template id="producerTemplate"/>
-<camel:routeBuilder ref="processorRoute" />
</camel:camelContext>
-
-<!-- jms connection pooling -->
-<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
-<property name="connectionFactory">
-<bean class="org.apache.activemq.ActiveMQConnectionFactory">
-<property name="brokerURL" value="vm://localhost?broker.useJmx=false"/>
-</bean>
-</property>
-<!--
-<property name="transactionManager" ref="jmsTransactionManager"/>
- -->
+<!-- lets create an embedded ActiveMQ Broker -->
+<amq:broker useJmx="false" persistent="true" dataDirectory="../data/"
schedulerSupport="true" id="broker">
+<amq:transportConnectors>
+<amq:transportConnector uri="tcp://localhost:0" />
+</amq:transportConnectors>
+</amq:broker>
+
+
+<amq:connectionFactory id="amqConnectionFactory"
brokerURL="vm://localhost?create=false" />
+
+<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number
of sessions to cache -->
+<bean id="jmsConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
+<constructor-arg ref="amqConnectionFactory" />
+<property name="sessionCacheSize" value="100" />
</bean>
<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
-
-<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
-<property name="config" value="file://conf/activemq.xml" />
-<property name="start" value="true" />
-</bean>
<bean id="mailQueueFactory"
class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" depends-on="broker">
<!-- Allow to specify if BlobMessage or BytesMessage should be used for
storing the Mail in the queue-->
@@ -156,6 +155,7 @@
<!-- By default only BytesMessage is used -->
<property name="sizeTreshold" value="-1"/>
</bean>
+
<!-- Build the camelroute from the spoolmanager.xml -->
<bean id="mailProcessor" name="processorRoute"
class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]