Makes sense... Will take care later. Thx Norman
2010/10/7, Eric Charles <[email protected]>: > small detail: ../data could be ../var/activemq-data to have all datas in > var dir. > Tks, > Eric > > On 6/10/2010 14:15, [email protected] wrote: >> Author: norman >> Date: Wed Oct 6 12:15:05 2010 >> New Revision: 1005005 >> >> URL: http://svn.apache.org/viewvc?rev=1005005&view=rev >> Log: >> * Use amq: namespace to configure it via spring, so no need to >> activemq.xml. >> * Some more fine tuning to make sure that JamesSpoolManager is really >> thread-safe on shutdown >> >> Removed: >> >> james/server/trunk/spring-deployment/src/main/config/james/activemq.xml >> Modified: >> >> james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java >> >> james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java >> >> james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml >> >> Modified: >> james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java >> URL: >> http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1005005&r1=1005004&r2=1005005&view=diff >> ============================================================================== >> --- >> james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java >> (original) >> +++ >> james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java >> Wed Oct 6 12:15:05 2010 >> @@ -25,10 +25,13 @@ import java.util.Iterator; >> import java.util.List; >> import java.util.Map; >> >> +import javax.annotation.PostConstruct; >> import javax.annotation.PreDestroy; >> import javax.annotation.Resource; >> import javax.mail.MessagingException; >> >> +import org.apache.camel.CamelContext; >> +import org.apache.camel.CamelContextAware; >> import org.apache.camel.CamelExecutionException; >> import org.apache.camel.Exchange; >> import org.apache.camel.Processor; >> @@ -60,7 +63,7 @@ import org.apache.mailet.base.MatcherInv >> * It also offer the {...@link MailProcessorList} implementation which >> allow to inject {...@link Mail} into the routes >> * >> */ >> -public class CamelMailProcessorList extends RouteBuilder implements >> Configurable, LogEnabled, MailProcessorList { >> +public class CamelMailProcessorList implements Configurable, LogEnabled, >> MailProcessorList, CamelContextAware { >> >> private MatcherLoader matcherLoader; >> private HierarchicalConfiguration config; >> @@ -83,150 +86,16 @@ public class CamelMailProcessorList exte >> } >> >> private ProducerTemplate producerTemplate; >> - /* >> - * (non-Javadoc) >> - * @see org.apache.camel.builder.RouteBuilder#configure() >> - */ >> - @SuppressWarnings("unchecked") >> - @Override >> - public void configure() throws Exception { >> - Processor terminatingMailetProcessor = new MailetProcessor(new >> TerminatingMailet(), logger); >> - Processor disposeProcessor = new DisposeProcessor(); >> - Processor mailProcessor = new MailCamelProcessor(); >> - Processor removePropsProcessor = new RemovePropertiesProcessor(); >> - >> - List<HierarchicalConfiguration> processorConfs = >> config.configurationsAt("processor"); >> - for (int i = 0; i< processorConfs.size(); i++) { >> - final HierarchicalConfiguration processorConf = >> processorConfs.get(i); >> - String processorName = processorConf.getString("[...@name]"); >> - >> - >> - mailets.put(processorName, new ArrayList<Mailet>()); >> - matchers.put(processorName, new ArrayList<Matcher>()); >> - >> - RouteDefinition processorDef = >> from(getEndpoint(processorName)).inOnly() >> - // store the logger in properties >> - .setProperty(MatcherSplitter.LOGGER_PROPERTY, >> constant(logger)); >> - >> - >> - final List<HierarchicalConfiguration> mailetConfs = >> processorConf.configurationsAt("mailet"); >> - // Loop through the mailet configuration, load >> - // all of the matcher and mailets, and add >> - // them to the processor. >> - for (int j = 0; j< mailetConfs.size(); j++) { >> - HierarchicalConfiguration c = mailetConfs.get(j); >> - >> - // We need to set this because of correctly parsing comma >> - String mailetClassName = c.getString("[...@class]"); >> - String matcherName = c.getString("[...@match]", null); >> - String invertedMatcherName = c.getString("[...@notmatch]", >> null); >> - >> - Mailet mailet = null; >> - Matcher matcher = null; >> - try { >> - >> - if (matcherName != null&& invertedMatcherName != >> null) { >> - // if no matcher is configured throw an Exception >> - throw new ConfigurationException("Please >> configure only match or nomatch per mailet"); >> - } else if (matcherName != null) { >> - matcher = matcherLoader.getMatcher(matcherName); >> - } else if (invertedMatcherName != null) { >> - matcher = new >> MatcherInverter(matcherLoader.getMatcher(invertedMatcherName)); >> - >> - } else { >> - // default matcher is All >> - matcher = matcherLoader.getMatcher("All"); >> - } >> - >> - // The matcher itself should log that it's been >> inited. >> - if (logger.isInfoEnabled()) { >> - StringBuffer infoBuffer = new >> StringBuffer(64).append("Matcher ").append(matcherName).append(" >> instantiated."); >> - logger.info(infoBuffer.toString()); >> - } >> - } catch (MessagingException ex) { >> - // **** Do better job printing out exception >> - if (logger.isErrorEnabled()) { >> - StringBuffer errorBuffer = new >> StringBuffer(256).append("Unable to init matcher >> ").append(matcherName).append(": ").append(ex.toString()); >> - logger.error(errorBuffer.toString(), ex); >> - if (ex.getNextException() != null) { >> - logger.error("Caused by nested exception: ", >> ex.getNextException()); >> - } >> - } >> - System.err.println("Unable to init matcher " + >> matcherName); >> - System.err.println("Check spool manager logs for more >> details."); >> - // System.exit(1); >> - throw new ConfigurationException("Unable to init >> matcher", ex); >> - } >> - try { >> - mailet = mailetLoader.getMailet(mailetClassName, c); >> - if (logger.isInfoEnabled()) { >> - StringBuffer infoBuffer = new >> StringBuffer(64).append("Mailet ").append(mailetClassName).append(" >> instantiated."); >> - logger.info(infoBuffer.toString()); >> - } >> - } catch (MessagingException ex) { >> - // **** Do better job printing out exception >> - if (logger.isErrorEnabled()) { >> - StringBuffer errorBuffer = new >> StringBuffer(256).append("Unable to init mailet >> ").append(mailetClassName).append(": ").append(ex.toString()); >> - logger.error(errorBuffer.toString(), ex); >> - if (ex.getNextException() != null) { >> - logger.error("Caused by nested exception: ", >> ex.getNextException()); >> - } >> - } >> - System.err.println("Unable to init mailet " + >> mailetClassName); >> - System.err.println("Check spool manager logs for more >> details."); >> - throw new ConfigurationException("Unable to init >> mailet", ex); >> - } >> - if (mailet != null&& matcher != null) { >> - String onMatchException = null; >> - MailetConfig mailetConfig = mailet.getMailetConfig(); >> - >> - if (mailetConfig instanceof MailetConfigImpl) { >> - onMatchException = ((MailetConfigImpl) >> mailetConfig).getInitAttribute("onMatchException"); >> - } >> - >> - // Store the matcher to use for splitter in >> properties >> - processorDef >> - >> .setProperty(MatcherSplitter.MATCHER_PROPERTY, >> constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, >> constant(onMatchException)) >> - >> - // do splitting of the mail based on the >> stored matcher >> - >> .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing() >> - >> - .choice().when(new >> MatcherMatch()).process(new MailetProcessor(mailet, logger)).end() >> - >> - .choice().when(new >> MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end() >> - >> - .choice().when(new >> MailStateNotEquals(processorName)).process(mailProcessor).stop().end(); >> - >> - // store mailet and matcher >> - mailets.get(processorName).add(mailet); >> - matchers.get(processorName).add(matcher); >> - } >> - >> + private CamelContext camelContext; >> + >> >> - } >> - >> - processorDef >> - // start choice >> - .choice() >> - >> - // when the mail state did not change till yet ( the >> end of the route) we need to call the TerminatingMailet to >> - // make sure we don't fall into a endless loop >> - .when(new >> MailStateEquals(processorName)).process(terminatingMailetProcessor).stop() >> - >> - >> - // dispose when needed >> - .when(new >> MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop() >> - >> - // route it to the next processor >> - .otherwise().process(mailProcessor).stop(); >> - >> - processors.put(processorName, new >> ChildMailProcessor(processorName)); >> - } >> - >> - producerTemplate = getContext().createProducerTemplate(); >> - } >> >> + @PostConstruct >> + public void init() throws Exception { >> + getCamelContext().addRoutes(new SpoolRouteBuilder()); >> >> + producerTemplate = getCamelContext().createProducerTemplate(); >> + } >> /** >> * Destroy all mailets and matchers >> */ >> @@ -428,4 +297,156 @@ public class CamelMailProcessorList exte >> >> } >> >> + public CamelContext getCamelContext() { >> + return camelContext; >> + } >> + >> + public void setCamelContext(CamelContext camelContext) { >> + this.camelContext = camelContext; >> + } >> + >> + private final class SpoolRouteBuilder extends RouteBuilder { >> + /* >> + * (non-Javadoc) >> + * @see org.apache.camel.builder.RouteBuilder#configure() >> + */ >> + @SuppressWarnings("unchecked") >> + @Override >> + public void configure() throws Exception { >> + Processor terminatingMailetProcessor = new MailetProcessor(new >> TerminatingMailet(), logger); >> + Processor disposeProcessor = new DisposeProcessor(); >> + Processor mailProcessor = new MailCamelProcessor(); >> + Processor removePropsProcessor = new >> RemovePropertiesProcessor(); >> + >> + List<HierarchicalConfiguration> processorConfs = >> config.configurationsAt("processor"); >> + for (int i = 0; i< processorConfs.size(); i++) { >> + final HierarchicalConfiguration processorConf = >> processorConfs.get(i); >> + String processorName = >> processorConf.getString("[...@name]"); >> + >> + >> + mailets.put(processorName, new ArrayList<Mailet>()); >> + matchers.put(processorName, new ArrayList<Matcher>()); >> + >> + RouteDefinition processorDef = >> from(getEndpoint(processorName)).inOnly() >> + // store the logger in properties >> + .setProperty(MatcherSplitter.LOGGER_PROPERTY, >> constant(logger)); >> + >> + >> + final List<HierarchicalConfiguration> mailetConfs = >> processorConf.configurationsAt("mailet"); >> + // Loop through the mailet configuration, load >> + // all of the matcher and mailets, and add >> + // them to the processor. >> + for (int j = 0; j< mailetConfs.size(); j++) { >> + HierarchicalConfiguration c = mailetConfs.get(j); >> + >> + // We need to set this because of correctly parsing >> comma >> + String mailetClassName = c.getString("[...@class]"); >> + String matcherName = c.getString("[...@match]", null); >> + String invertedMatcherName = >> c.getString("[...@notmatch]", >> null); >> + >> + Mailet mailet = null; >> + Matcher matcher = null; >> + try { >> + >> + if (matcherName != null&& invertedMatcherName != >> null) { >> + // if no matcher is configured throw an >> Exception >> + throw new ConfigurationException("Please >> configure only match or nomatch per mailet"); >> + } else if (matcherName != null) { >> + matcher = matcherLoader.getMatcher(matcherName); >> + } else if (invertedMatcherName != null) { >> + matcher = new >> MatcherInverter(matcherLoader.getMatcher(invertedMatcherName)); >> + >> + } else { >> + // default matcher is All >> + matcher = matcherLoader.getMatcher("All"); >> + } >> + >> + // The matcher itself should log that it's been >> inited. >> + if (logger.isInfoEnabled()) { >> + StringBuffer infoBuffer = new >> StringBuffer(64).append("Matcher ").append(matcherName).append(" >> instantiated."); >> + logger.info(infoBuffer.toString()); >> + } >> + } catch (MessagingException ex) { >> + // **** Do better job printing out exception >> + if (logger.isErrorEnabled()) { >> + StringBuffer errorBuffer = new >> StringBuffer(256).append("Unable to init matcher >> ").append(matcherName).append(": ").append(ex.toString()); >> + logger.error(errorBuffer.toString(), ex); >> + if (ex.getNextException() != null) { >> + logger.error("Caused by nested exception: ", >> ex.getNextException()); >> + } >> + } >> + System.err.println("Unable to init matcher " + >> matcherName); >> + System.err.println("Check spool manager logs for >> more details."); >> + // System.exit(1); >> + throw new ConfigurationException("Unable to init >> matcher", ex); >> + } >> + try { >> + mailet = mailetLoader.getMailet(mailetClassName, c); >> + if (logger.isInfoEnabled()) { >> + StringBuffer infoBuffer = new >> StringBuffer(64).append("Mailet ").append(mailetClassName).append(" >> instantiated."); >> + logger.info(infoBuffer.toString()); >> + } >> + } catch (MessagingException ex) { >> + // **** Do better job printing out exception >> + if (logger.isErrorEnabled()) { >> + StringBuffer errorBuffer = new >> StringBuffer(256).append("Unable to init mailet >> ").append(mailetClassName).append(": ").append(ex.toString()); >> + logger.error(errorBuffer.toString(), ex); >> + if (ex.getNextException() != null) { >> + logger.error("Caused by nested exception: ", >> ex.getNextException()); >> + } >> + } >> + System.err.println("Unable to init mailet " + >> mailetClassName); >> + System.err.println("Check spool manager logs for >> more details."); >> + throw new ConfigurationException("Unable to init >> mailet", ex); >> + } >> + if (mailet != null&& matcher != null) { >> + String onMatchException = null; >> + MailetConfig mailetConfig = >> mailet.getMailetConfig(); >> + >> + if (mailetConfig instanceof MailetConfigImpl) { >> + onMatchException = ((MailetConfigImpl) >> mailetConfig).getInitAttribute("onMatchException"); >> + } >> + >> + // Store the matcher to use for splitter in >> properties >> + processorDef >> + >> .setProperty(MatcherSplitter.MATCHER_PROPERTY, >> constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, >> constant(onMatchException)) >> + >> + // do splitting of the mail based on the >> stored matcher >> + >> .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing() >> + >> + .choice().when(new >> MatcherMatch()).process(new MailetProcessor(mailet, logger)).end() >> + >> + .choice().when(new >> MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end() >> + >> + .choice().when(new >> MailStateNotEquals(processorName)).process(mailProcessor).stop().end(); >> + >> + // store mailet and matcher >> + mailets.get(processorName).add(mailet); >> + matchers.get(processorName).add(matcher); >> + } >> + >> + >> + } >> + >> + processorDef >> + // start choice >> + .choice() >> + >> + // when the mail state did not change till yet ( the >> end of the route) we need to call the TerminatingMailet to >> + // make sure we don't fall into a endless loop >> + .when(new >> MailStateEquals(processorName)).process(terminatingMailetProcessor).stop() >> + >> + >> + // dispose when needed >> + .when(new >> MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop() >> + >> + // route it to the next processor >> + .otherwise().process(mailProcessor).stop(); >> + >> + processors.put(processorName, new >> ChildMailProcessor(processorName)); >> + } >> + >> + } >> + } >> + >> } >> >> Modified: >> james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java >> URL: >> http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1005005&r1=1005004&r2=1005005&view=diff >> ============================================================================== >> --- >> james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java >> (original) >> +++ >> james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java >> Wed Oct 6 12:15:05 2010 >> @@ -25,6 +25,7 @@ import java.util.ArrayList; >> import java.util.Collection; >> import java.util.List; >> import java.util.concurrent.atomic.AtomicBoolean; >> +import java.util.concurrent.atomic.AtomicInteger; >> >> import javax.annotation.PostConstruct; >> import javax.annotation.PreDestroy; >> @@ -73,7 +74,7 @@ public class JamesSpoolManager implement >> /** >> * Number of active threads >> */ >> - private int numActive; >> + private AtomicInteger numActive = new AtomicInteger(0);; >> >> /** >> * Spool threads are active >> @@ -132,7 +133,6 @@ public class JamesSpoolManager implement >> } >> >> active.set(true); >> - numActive = 0; >> spoolThreads = new java.util.ArrayList<Thread>(numThreads); >> for ( int i = 0 ; i< numThreads ; i++ ) { >> Thread reader = new Thread(this, "Spool Thread #" + i); >> @@ -153,8 +153,9 @@ public class JamesSpoolManager implement >> logger.info("Spool=" + queue.getClass().getName()); >> } >> >> - numActive++; >> while(active.get()) { >> + numActive.incrementAndGet(); >> + >> try { >> queue.deQueue(new DequeueOperation() { >> >> @@ -189,12 +190,14 @@ public class JamesSpoolManager implement >> logger.error("Exception processing mail in >> JamesSpoolManager.run " >> + e.getMessage(), e); >> } >> + } finally { >> + numActive.decrementAndGet(); >> } >> + >> } >> if (logger.isInfoEnabled()){ >> logger.info("Stop JamesSpoolManager: " + >> Thread.currentThread().getName()); >> } >> - numActive--; >> } >> >> /** >> @@ -217,11 +220,13 @@ public class JamesSpoolManager implement >> >> long stop = System.currentTimeMillis() + 60000; >> // give the spooler threads one minute to terminate gracefully >> - while (numActive != 0&& stop> System.currentTimeMillis()) { >> + /* >> + while (numActive.get() != 0&& stop> System.currentTimeMillis()) >> { >> try { >> Thread.sleep(1000); >> } catch (Exception ignored) {} >> } >> + */ >> logger.info("JamesSpoolManager thread shutdown completed."); >> } >> >> >> Modified: >> james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml >> URL: >> http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=1005005&r1=1005004&r2=1005005&view=diff >> ============================================================================== >> --- >> james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml >> (original) >> +++ >> james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml >> Wed Oct 6 12:15:05 2010 >> @@ -21,10 +21,12 @@ >> <beans xmlns="http://www.springframework.org/schema/beans" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xmlns:camel="http://camel.apache.org/schema/spring" >> + xmlns:amq="http://activemq.apache.org/schema/core" >> xsi:schemaLocation=" >> http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans-3.0.xsd >> - http://camel.apache.org/schema/spring >> http://camel.apache.org/schema/spring/camel-spring.xsd"> >> - >> + http://camel.apache.org/schema/spring >> http://camel.apache.org/schema/spring/camel-spring.xsd >> + http://activemq.apache.org/schema/core >> http://activemq.apache.org/schema/core/activemq-core.xsd"> >> + >> >> <!-- >> ** JMX part ** to enable exposure of JMX, activate the following >> beans >> @@ -124,31 +126,28 @@ >> <camel:camelContext id="jamesCamelContext" trace="false"> >> <camel:jmxAgent id="agent" disabled="true"/> >> <camel:template id="producerTemplate"/> >> -<camel:routeBuilder ref="processorRoute" /> >> </camel:camelContext> >> >> - >> -<!-- jms connection pooling --> >> -<bean id="jmsConnectionFactory" >> class="org.apache.activemq.pool.PooledConnectionFactory" >> destroy-method="stop"> >> -<property name="connectionFactory"> >> -<bean class="org.apache.activemq.ActiveMQConnectionFactory"> >> -<property name="brokerURL" value="vm://localhost?broker.useJmx=false"/> >> -</bean> >> -</property> >> -<!-- >> -<property name="transactionManager" ref="jmsTransactionManager"/> >> - --> >> +<!-- lets create an embedded ActiveMQ Broker --> >> +<amq:broker useJmx="false" persistent="true" dataDirectory="../data/" >> schedulerSupport="true" id="broker"> >> +<amq:transportConnectors> >> +<amq:transportConnector uri="tcp://localhost:0" /> >> +</amq:transportConnectors> >> +</amq:broker> >> + >> + >> +<amq:connectionFactory id="amqConnectionFactory" >> brokerURL="vm://localhost?create=false" /> >> + >> +<!-- CachingConnectionFactory Definition, sessionCacheSize property is >> the number of sessions to cache --> >> +<bean id="jmsConnectionFactory" >> class="org.springframework.jms.connection.CachingConnectionFactory"> >> +<constructor-arg ref="amqConnectionFactory" /> >> +<property name="sessionCacheSize" value="100" /> >> </bean> >> >> <!-- setup spring jms TX manager --> >> <bean id="jmsTransactionManager" >> class="org.springframework.jms.connection.JmsTransactionManager"> >> <property name="connectionFactory" ref="jmsConnectionFactory"/> >> </bean> >> - >> -<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean"> >> -<property name="config" value="file://conf/activemq.xml" /> >> -<property name="start" value="true" /> >> -</bean> >> >> <bean id="mailQueueFactory" >> class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" >> depends-on="broker"> >> <!-- Allow to specify if BlobMessage or BytesMessage should be >> used for storing the Mail in the queue--> >> @@ -156,6 +155,7 @@ >> <!-- By default only BytesMessage is used --> >> <property name="sizeTreshold" value="-1"/> >> </bean> >> + >> >> <!-- Build the camelroute from the spoolmanager.xml --> >> <bean id="mailProcessor" name="processorRoute" >> class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/> >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
