small detail: ../data could be ../var/activemq-data to have all datas in var dir.
Tks,
Eric

On 6/10/2010 14:15, [email protected] wrote:
Author: norman
Date: Wed Oct  6 12:15:05 2010
New Revision: 1005005

URL: http://svn.apache.org/viewvc?rev=1005005&view=rev
Log:
* Use amq: namespace to configure it via spring, so no need to activemq.xml.
* Some more fine tuning to make sure that JamesSpoolManager is really 
thread-safe on shutdown

Removed:
     james/server/trunk/spring-deployment/src/main/config/james/activemq.xml
Modified:
     
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
     
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
     james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml

Modified: 
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- 
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
 (original)
+++ 
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
 Wed Oct  6 12:15:05 2010
@@ -25,10 +25,13 @@ import java.util.Iterator;
  import java.util.List;
  import java.util.Map;

+import javax.annotation.PostConstruct;
  import javax.annotation.PreDestroy;
  import javax.annotation.Resource;
  import javax.mail.MessagingException;

+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
  import org.apache.camel.CamelExecutionException;
  import org.apache.camel.Exchange;
  import org.apache.camel.Processor;
@@ -60,7 +63,7 @@ import org.apache.mailet.base.MatcherInv
   * It also offer the {...@link MailProcessorList} implementation which allow 
to inject {...@link Mail} into the routes
   *
   */
-public class CamelMailProcessorList extends RouteBuilder implements 
Configurable, LogEnabled, MailProcessorList {
+public class CamelMailProcessorList implements Configurable, LogEnabled, 
MailProcessorList, CamelContextAware {

      private MatcherLoader matcherLoader;
      private HierarchicalConfiguration config;
@@ -83,150 +86,16 @@ public class CamelMailProcessorList exte
      }

      private ProducerTemplate producerTemplate;
-    /*
-     * (non-Javadoc)
-     * @see org.apache.camel.builder.RouteBuilder#configure()
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure() throws Exception {
-        Processor terminatingMailetProcessor = new MailetProcessor(new 
TerminatingMailet(), logger);
-        Processor disposeProcessor = new DisposeProcessor();
-        Processor mailProcessor = new MailCamelProcessor();
-        Processor removePropsProcessor = new RemovePropertiesProcessor();
-
-        List<HierarchicalConfiguration>  processorConfs = 
config.configurationsAt("processor");
-        for (int i = 0; i<  processorConfs.size(); i++) {
-            final HierarchicalConfiguration processorConf = 
processorConfs.get(i);
-            String processorName = processorConf.getString("[...@name]");
-
-
-            mailets.put(processorName, new ArrayList<Mailet>());
-            matchers.put(processorName, new ArrayList<Matcher>());
-
-            RouteDefinition processorDef = 
from(getEndpoint(processorName)).inOnly()
-            // store the logger in properties
-            .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
-
-
-            final List<HierarchicalConfiguration>  mailetConfs = 
processorConf.configurationsAt("mailet");
-            // Loop through the mailet configuration, load
-            // all of the matcher and mailets, and add
-            // them to the processor.
-            for (int j = 0; j<  mailetConfs.size(); j++) {
-                HierarchicalConfiguration c = mailetConfs.get(j);
-
-                // We need to set this because of correctly parsing comma
-                String mailetClassName = c.getString("[...@class]");
-                String matcherName = c.getString("[...@match]", null);
-                String invertedMatcherName = c.getString("[...@notmatch]", 
null);
-
-                Mailet mailet = null;
-                Matcher matcher = null;
-                try {
-
-                    if (matcherName != null&&  invertedMatcherName != null) {
-                        // if no matcher is configured throw an Exception
-                        throw new ConfigurationException("Please configure only 
match or nomatch per mailet");
-                    } else if (matcherName != null) {
-                        matcher = matcherLoader.getMatcher(matcherName);
-                    } else if (invertedMatcherName != null) {
-                        matcher = new 
MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
-
-                    } else {
-                        // default matcher is All
-                        matcher = matcherLoader.getMatcher("All");
-                    }
-
-                    // The matcher itself should log that it's been inited.
-                    if (logger.isInfoEnabled()) {
-                        StringBuffer infoBuffer = new StringBuffer(64).append("Matcher 
").append(matcherName).append(" instantiated.");
-                        logger.info(infoBuffer.toString());
-                    }
-                } catch (MessagingException ex) {
-                    // **** Do better job printing out exception
-                    if (logger.isErrorEnabled()) {
-                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to 
init matcher ").append(matcherName).append(": ").append(ex.toString());
-                        logger.error(errorBuffer.toString(), ex);
-                        if (ex.getNextException() != null) {
-                            logger.error("Caused by nested exception: ", 
ex.getNextException());
-                        }
-                    }
-                    System.err.println("Unable to init matcher " + 
matcherName);
-                    System.err.println("Check spool manager logs for more 
details.");
-                    // System.exit(1);
-                    throw new ConfigurationException("Unable to init matcher", 
ex);
-                }
-                try {
-                    mailet = mailetLoader.getMailet(mailetClassName, c);
-                    if (logger.isInfoEnabled()) {
-                        StringBuffer infoBuffer = new StringBuffer(64).append("Mailet 
").append(mailetClassName).append(" instantiated.");
-                        logger.info(infoBuffer.toString());
-                    }
-                } catch (MessagingException ex) {
-                    // **** Do better job printing out exception
-                    if (logger.isErrorEnabled()) {
-                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to 
init mailet ").append(mailetClassName).append(": ").append(ex.toString());
-                        logger.error(errorBuffer.toString(), ex);
-                        if (ex.getNextException() != null) {
-                            logger.error("Caused by nested exception: ", 
ex.getNextException());
-                        }
-                    }
-                    System.err.println("Unable to init mailet " + 
mailetClassName);
-                    System.err.println("Check spool manager logs for more 
details.");
-                    throw new ConfigurationException("Unable to init mailet", 
ex);
-                }
-                if (mailet != null&&  matcher != null) {
-                    String onMatchException = null;
-                    MailetConfig mailetConfig = mailet.getMailetConfig();
-
-                    if (mailetConfig instanceof MailetConfigImpl) {
-                        onMatchException = ((MailetConfigImpl) 
mailetConfig).getInitAttribute("onMatchException");
-                    }
-
-                    // Store the matcher to use for splitter in properties
-                    processorDef
-                               .setProperty(MatcherSplitter.MATCHER_PROPERTY, 
constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, 
constant(onMatchException))
-
-                            // do splitting of the mail based on the stored 
matcher
-                            
.split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
-
-                            .choice().when(new MatcherMatch()).process(new 
MailetProcessor(mailet, logger)).end()
-
-                            .choice().when(new 
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
-
-                            .choice().when(new 
MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
-
-                    // store mailet and matcher
-                    mailets.get(processorName).add(mailet);
-                    matchers.get(processorName).add(matcher);
-                }
-
+       private CamelContext camelContext;
+

-            }
-
-            processorDef
-                    // start choice
-                    .choice()
-
-                    // when the mail state did not change till yet ( the end 
of the route) we need to call the TerminatingMailet to
-                    // make sure we don't fall into a endless loop
-                    .when(new 
MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
-
-
-                    // dispose when needed
-                    .when(new 
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
-
-                     // route it to the next processor
-                    .otherwise().process(mailProcessor).stop();
-
-            processors.put(processorName, new 
ChildMailProcessor(processorName));
-        }
-
-        producerTemplate = getContext().createProducerTemplate();
-    }

+       @PostConstruct
+       public void init() throws Exception  {
+               getCamelContext().addRoutes(new SpoolRouteBuilder());

+               producerTemplate = getCamelContext().createProducerTemplate();
+       }
      /**
       * Destroy all mailets and matchers
       */
@@ -428,4 +297,156 @@ public class CamelMailProcessorList exte

      }

+       public CamelContext getCamelContext() {
+               return camelContext;
+       }
+
+       public void setCamelContext(CamelContext camelContext) {
+               this.camelContext = camelContext;
+       }
+       
+       private final class SpoolRouteBuilder extends RouteBuilder {
+               /*
+            * (non-Javadoc)
+            * @see org.apache.camel.builder.RouteBuilder#configure()
+            */
+           @SuppressWarnings("unchecked")
+           @Override
+           public void configure() throws Exception {
+               Processor terminatingMailetProcessor = new MailetProcessor(new 
TerminatingMailet(), logger);
+               Processor disposeProcessor = new DisposeProcessor();
+               Processor mailProcessor = new MailCamelProcessor();
+               Processor removePropsProcessor = new 
RemovePropertiesProcessor();
+       
+               List<HierarchicalConfiguration>  processorConfs = 
config.configurationsAt("processor");
+               for (int i = 0; i<  processorConfs.size(); i++) {
+                   final HierarchicalConfiguration processorConf = 
processorConfs.get(i);
+                   String processorName = 
processorConf.getString("[...@name]");
+
+       
+                   mailets.put(processorName, new ArrayList<Mailet>());
+                   matchers.put(processorName, new ArrayList<Matcher>());
+
+                   RouteDefinition processorDef = 
from(getEndpoint(processorName)).inOnly()
+                   // store the logger in properties
+                   .setProperty(MatcherSplitter.LOGGER_PROPERTY, 
constant(logger));
+       
+       
+                   final List<HierarchicalConfiguration>  mailetConfs = 
processorConf.configurationsAt("mailet");
+                   // Loop through the mailet configuration, load
+                   // all of the matcher and mailets, and add
+                   // them to the processor.
+                   for (int j = 0; j<  mailetConfs.size(); j++) {
+                       HierarchicalConfiguration c = mailetConfs.get(j);
+
+                       // We need to set this because of correctly parsing 
comma
+                       String mailetClassName = c.getString("[...@class]");
+                       String matcherName = c.getString("[...@match]", null);
+                       String invertedMatcherName = 
c.getString("[...@notmatch]", null);
+
+                       Mailet mailet = null;
+                       Matcher matcher = null;
+                       try {
+
+                           if (matcherName != null&&  invertedMatcherName != 
null) {
+                               // if no matcher is configured throw an 
Exception
+                               throw new ConfigurationException("Please configure 
only match or nomatch per mailet");
+                           } else if (matcherName != null) {
+                               matcher = matcherLoader.getMatcher(matcherName);
+                           } else if (invertedMatcherName != null) {
+                               matcher = new 
MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
+
+                           } else {
+                               // default matcher is All
+                               matcher = matcherLoader.getMatcher("All");
+                           }
+
+                           // The matcher itself should log that it's been 
inited.
+                           if (logger.isInfoEnabled()) {
+                               StringBuffer infoBuffer = new StringBuffer(64).append("Matcher 
").append(matcherName).append(" instantiated.");
+                               logger.info(infoBuffer.toString());
+                           }
+                       } catch (MessagingException ex) {
+                           // **** Do better job printing out exception
+                           if (logger.isErrorEnabled()) {
+                               StringBuffer errorBuffer = new 
StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": 
").append(ex.toString());
+                               logger.error(errorBuffer.toString(), ex);
+                               if (ex.getNextException() != null) {
+                                   logger.error("Caused by nested exception: 
", ex.getNextException());
+                               }
+                           }
+                           System.err.println("Unable to init matcher " + 
matcherName);
+                           System.err.println("Check spool manager logs for more 
details.");
+                           // System.exit(1);
+                           throw new ConfigurationException("Unable to init 
matcher", ex);
+                       }
+                       try {
+                           mailet = mailetLoader.getMailet(mailetClassName, c);
+                           if (logger.isInfoEnabled()) {
+                               StringBuffer infoBuffer = new StringBuffer(64).append("Mailet 
").append(mailetClassName).append(" instantiated.");
+                               logger.info(infoBuffer.toString());
+                           }
+                       } catch (MessagingException ex) {
+                           // **** Do better job printing out exception
+                           if (logger.isErrorEnabled()) {
+                               StringBuffer errorBuffer = new 
StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": 
").append(ex.toString());
+                               logger.error(errorBuffer.toString(), ex);
+                               if (ex.getNextException() != null) {
+                                   logger.error("Caused by nested exception: 
", ex.getNextException());
+                               }
+                           }
+                           System.err.println("Unable to init mailet " + 
mailetClassName);
+                           System.err.println("Check spool manager logs for more 
details.");
+                           throw new ConfigurationException("Unable to init 
mailet", ex);
+                       }
+                       if (mailet != null&&  matcher != null) {
+                           String onMatchException = null;
+                           MailetConfig mailetConfig = 
mailet.getMailetConfig();
+       
+                           if (mailetConfig instanceof MailetConfigImpl) {
+                               onMatchException = ((MailetConfigImpl) 
mailetConfig).getInitAttribute("onMatchException");
+                           }
+       
+                           // Store the matcher to use for splitter in 
properties
+                           processorDef
+                                       
.setProperty(MatcherSplitter.MATCHER_PROPERTY, 
constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, 
constant(onMatchException))
+       
+                                   // do splitting of the mail based on the 
stored matcher
+                                   
.split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
+
+                                   .choice().when(new 
MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
+       
+                                   .choice().when(new 
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
+
+                                   .choice().when(new 
MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
+
+                           // store mailet and matcher
+                           mailets.get(processorName).add(mailet);
+                           matchers.get(processorName).add(matcher);
+                       }
+       
+
+                   }
+       
+                   processorDef
+                           // start choice
+                           .choice()
+       
+                           // when the mail state did not change till yet ( 
the end of the route) we need to call the TerminatingMailet to
+                           // make sure we don't fall into a endless loop
+                           .when(new 
MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
+       
+       
+                           // dispose when needed
+                           .when(new 
MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
+       
+                            // route it to the next processor
+                           .otherwise().process(mailProcessor).stop();
+       
+                   processors.put(processorName, new 
ChildMailProcessor(processorName));
+               }
+       
+           }
+       }
+
  }

Modified: 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
 (original)
+++ 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
 Wed Oct  6 12:15:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
  import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;

  import javax.annotation.PostConstruct;
  import javax.annotation.PreDestroy;
@@ -73,7 +74,7 @@ public class JamesSpoolManager implement
      /**
       * Number of active threads
       */
-    private int numActive;
+    private AtomicInteger numActive = new AtomicInteger(0);;

      /**
       * Spool threads are active
@@ -132,7 +133,6 @@ public class JamesSpoolManager implement
          }

          active.set(true);
-        numActive = 0;
          spoolThreads = new java.util.ArrayList<Thread>(numThreads);
          for ( int i = 0 ; i<  numThreads ; i++ ) {
              Thread reader = new Thread(this, "Spool Thread #" + i);
@@ -153,8 +153,9 @@ public class JamesSpoolManager implement
              logger.info("Spool=" + queue.getClass().getName());
          }

-        numActive++;
          while(active.get()) {
+            numActive.incrementAndGet();
+
              try {
                  queue.deQueue(new DequeueOperation() {

@@ -189,12 +190,14 @@ public class JamesSpoolManager implement
                      logger.error("Exception processing mail in 
JamesSpoolManager.run "
                                        + e.getMessage(), e);
                  }
+            } finally {
+                numActive.decrementAndGet();
              }
+
          }
          if (logger.isInfoEnabled()){
              logger.info("Stop JamesSpoolManager: " + 
Thread.currentThread().getName());
          }
-        numActive--;
      }

      /**
@@ -217,11 +220,13 @@ public class JamesSpoolManager implement

          long stop = System.currentTimeMillis() + 60000;
          // give the spooler threads one minute to terminate gracefully
-        while (numActive != 0&&  stop>  System.currentTimeMillis()) {
+        /*
+        while (numActive.get() != 0&&  stop>  System.currentTimeMillis()) {
              try {
                  Thread.sleep(1000);
              } catch (Exception ignored) {}
          }
+        */
          logger.info("JamesSpoolManager thread shutdown completed.");
      }


Modified: 
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: 
http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml 
(original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml 
Wed Oct  6 12:15:05 2010
@@ -21,10 +21,12 @@
  <beans xmlns="http://www.springframework.org/schema/beans";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xmlns:camel="http://camel.apache.org/schema/spring";
+       xmlns:amq="http://activemq.apache.org/schema/core";
         xsi:schemaLocation="
            http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
-          http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
-
+          http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+          http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+

      <!--
          ** JMX part ** to enable exposure of JMX, activate the following beans
@@ -124,31 +126,28 @@
      <camel:camelContext id="jamesCamelContext" trace="false">
          <camel:jmxAgent id="agent" disabled="true"/>
          <camel:template id="producerTemplate"/>
-<camel:routeBuilder ref="processorRoute" />
      </camel:camelContext>

-
-<!-- jms connection pooling  -->
-<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 
destroy-method="stop">
-<property name="connectionFactory">
-<bean class="org.apache.activemq.ActiveMQConnectionFactory">
-<property name="brokerURL" value="vm://localhost?broker.useJmx=false"/>
-</bean>
-</property>
-<!--
-<property name="transactionManager" ref="jmsTransactionManager"/>
-        -->
+<!--  lets create an embedded ActiveMQ Broker -->
+<amq:broker useJmx="false" persistent="true" dataDirectory="../data/" 
schedulerSupport="true" id="broker">
+<amq:transportConnectors>
+<amq:transportConnector uri="tcp://localhost:0" />
+</amq:transportConnectors>
+</amq:broker>
+
+
+<amq:connectionFactory id="amqConnectionFactory" 
brokerURL="vm://localhost?create=false" />
+
+<!-- CachingConnectionFactory Definition, sessionCacheSize property is the number 
of sessions to cache -->
+<bean id="jmsConnectionFactory" 
class="org.springframework.jms.connection.CachingConnectionFactory">
+<constructor-arg ref="amqConnectionFactory" />
+<property name="sessionCacheSize" value="100" />
      </bean>

      <!-- setup spring jms TX manager -->
      <bean id="jmsTransactionManager" 
class="org.springframework.jms.connection.JmsTransactionManager">
          <property name="connectionFactory" ref="jmsConnectionFactory"/>
      </bean>
-
-<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
-<property name="config" value="file://conf/activemq.xml" />
-<property name="start" value="true" />
-</bean>

      <bean id="mailQueueFactory" 
class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" depends-on="broker">
          <!-- Allow to specify if BlobMessage or BytesMessage should be used for 
storing the Mail in the queue-->
@@ -156,6 +155,7 @@
          <!-- By default only BytesMessage is used -->
          <property name="sizeTreshold" value="-1"/>
      </bean>
+

      <!-- Build the camelroute from the spoolmanager.xml -->
      <bean id="mailProcessor" name="processorRoute" 
class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to