Author: chirino
Date: Wed Apr 25 10:20:28 2007
New Revision: 532413

URL: http://svn.apache.org/viewvc?view=rev&rev=532413
Log:
- Removed the transactionPolicy attribute from the RouteBuilder since this was 
not actually configuring the inbound transaction policy as first envisioned.  
This may come back in a different shape/form.
- JpaMessageIdRepository now starts it's own transaction if called from a non 
transaction context.
- MockEndpoint now has a assertWait() method that can be used to wait for 
messages to arive before assertions are made against those messages.
- Added some more tests and imporved that TransactedJmsRouteTest
- The JmsConfiguration now uses a CacheLevel of CACHE_CONSUMER by default since 
this most efficient way to recieve messages from JMS.  


Added:
    
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
    
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
    
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
    
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
    
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
 Wed Apr 25 10:20:28 2007
@@ -416,7 +416,7 @@
         else {
             processor = new CompositeProcessor<E>(answer);
         }
-        return wrapInTransactionInterceptor(processor);
+        return processor;
     }
 
     /**
@@ -435,13 +435,6 @@
      */
     protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws 
Exception {
         return getErrorHandlerBuilder().createErrorHandler(processor);
-    }
-
-    /**
-     * A strategy method which allows transaction interceptors to be applied 
to a processor
-     */
-    protected Processor<E> wrapInTransactionInterceptor(Processor<E> 
processor) throws Exception {
-        return getBuilder().getTransactionPolicy().wrap(processor);
     }
 
     /**

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
 Wed Apr 25 10:20:28 2007
@@ -19,21 +19,13 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.FactoryFinder;
-import org.apache.camel.util.NoFactoryAvailableException;
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.Injector;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ReflectionInjector;
-import org.apache.camel.impl.NoPolicy;
 
 /**
  * A <a href="http://activemq.apache.org/camel/dsl.html";>Java DSL</a>
@@ -45,7 +37,6 @@
     private List<FromBuilder<E>> fromBuilders = new 
ArrayList<FromBuilder<E>>();
     private AtomicBoolean initalized = new AtomicBoolean(false);
     private List<Route<E>> routes = new ArrayList<Route<E>>();
-    private Policy<E> transactionPolicy;
 
     protected RouteBuilder() {
         this(null);
@@ -94,17 +85,6 @@
         return this;
     }
 
-    /**
-     * Specifies the transaction interceptor to be used for routes created 
from this builder
-     *
-     * @param interceptor the transaction interceptor to use
-     * @return the current builder
-     */
-    public RouteBuilder<E> transactionPolicy(Policy<E> interceptor) {
-        setTransactionPolicy(interceptor);
-        return this;
-    }
-
     // Properties
     //-----------------------------------------------------------------------
     public CamelContext getContext() {
@@ -132,20 +112,6 @@
         return fromBuilders;
     }
 
-    public Policy<E> getTransactionPolicy() throws Exception {
-        if (transactionPolicy == null) {
-            transactionPolicy = createTransactionPolicy();
-        }
-        return transactionPolicy;
-    }
-
-    /**
-     * Sets the interceptor used wrap processors in a transaction
-     */
-    public void setTransactionPolicy(Policy<E> transactionInterceptor) {
-        this.transactionPolicy = transactionInterceptor;
-    }
-
     // Implementation methods
     //-----------------------------------------------------------------------
     protected void checkInitialized() throws Exception {
@@ -183,20 +149,6 @@
      */
     protected CamelContext createContainer() {
         return new DefaultCamelContext();
-    }
-
-    /**
-     * Factory method
-     */
-    protected Policy<E> createTransactionPolicy() throws Exception {
-        FactoryFinder finder = new FactoryFinder();
-        try {
-            return (Policy<E>) finder.newInstance("TransactionPolicy", 
getContext().getInjector());
-        }
-        catch (NoFactoryAvailableException e) {
-            // lets use the default
-            return new NoPolicy<E>();
-        }
     }
 
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 Wed Apr 25 10:20:28 2007
@@ -17,6 +17,13 @@
  */
 package org.apache.camel.component.mock;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -29,13 +36,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A Mock endpoint which provides a literate, fluent API for testing routes 
using
  * a <a href="http://jmock.org/";>JMock style</a> API.
@@ -53,6 +53,26 @@
     private long sleepForEmptyTest = 0L;
        private int expectedMinimumCount=-1;
 
+    public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... 
endpoints) throws InterruptedException {
+       long start = System.currentTimeMillis();
+       long left = unit.toMillis(timeout);
+       long end = start + left;
+        for (MockEndpoint endpoint : endpoints) {
+                       if( !endpoint.await(left, TimeUnit.MILLISECONDS) )
+                       throw new AssertionError("Timeout waiting for endpoints 
to receive enough messages. "+endpoint.getEndpointUri()+" timed out.");
+                       left = end - System.currentTimeMillis();
+                       if( left <= 0 )
+                               left = 0;
+        }
+    }
+
+    public static void assertIsSatisfied(long timeout, TimeUnit unit, 
MockEndpoint... endpoints) throws InterruptedException {
+       assertWait(timeout, unit, endpoints);
+        for (MockEndpoint endpoint : endpoints) {
+            endpoint.assertIsSatisfied();
+        }
+    }
+
     public static void assertIsSatisfied(MockEndpoint... endpoints) throws 
InterruptedException {
         for (MockEndpoint endpoint : endpoints) {
             endpoint.assertIsSatisfied();
@@ -94,7 +114,7 @@
     public void assertIsSatisfied() throws InterruptedException {
         assertIsSatisfied(sleepForEmptyTest);
     }
-
+    
     /**
      * Validates that all the available expectations on this endpoint are 
satisfied; or throw an exception
      */
@@ -321,5 +341,18 @@
 
        public int getExpectedMinimumCount() {
                return expectedMinimumCount;
+       }
+
+       public void await() throws InterruptedException {
+               if( latch!=null ) {
+                       latch.await();
+               }
+       }
+
+       public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
+               if( latch!=null ) {
+                       return latch.await(timeout, unit);
+               }
+               return true;
        }
 }

Modified: 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 (original)
+++ 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 Wed Apr 25 10:20:28 2007
@@ -174,12 +174,16 @@
             if (concurrentConsumers >= 0) {
                 listenerContainer.setConcurrentConsumers(concurrentConsumers);
             }
+            
             if (cacheLevel >= 0) {
                 listenerContainer.setCacheLevel(cacheLevel);
-            }
-            if (cacheName != null) {
+            } else if (cacheName != null) {
                 listenerContainer.setCacheLevelName(cacheName);
+            } else {
+               // Default to CACHE_CONSUMER unless specified.  This works best 
with most JMS providers.
+               
listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
             }
+            
             if (idleTaskExecutionLimit >= 0) {
                 
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
             }

Modified: 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Wed Apr 25 10:20:28 2007
@@ -131,4 +131,5 @@
     public void setSelector(String selector) {
         this.selector = selector;
     }
+
 }

Modified: 
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
 (original)
+++ 
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
 Wed Apr 25 10:20:28 2007
@@ -18,13 +18,18 @@
 package org.apache.camel.component.jms;
 
 import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import static org.apache.camel.component.mock.MockEndpoint.assertWait;
+
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.builder.ProcessorFactory;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.DelegateProcessor;
@@ -57,26 +62,68 @@
                        Policy notsupported = new 
SpringTransactionPolicy(bean(TransactionTemplate.class, 
"PROPAGATION_NOT_SUPPORTED"));
                        Policy requirenew = new 
SpringTransactionPolicy(bean(TransactionTemplate.class, 
"PROPAGATION_REQUIRES_NEW"));
 
-                       DelegateProcessor rollback = new DelegateProcessor() {
-                               @Override
-                               public void process(Object exchange) {
-                                       processNext(exchange);
-                                       throw new RuntimeException("rollback");
-                               }
+                       Policy rollback = new Policy() {
+                                       public Processor wrap(Processor 
processor) {
+                                               return new 
DelegateProcessor(processor) {
+                                               @Override
+                                               public void process(Object 
exchange) {
+                                                       processNext(exchange);
+                                                       throw new 
RuntimeException("rollback");
+                                               }
+                                               
+                                               @Override
+                                               public String toString() {
+                                               return "rollback(" + next + ")";
+                                               }
+                                       };
+                                       }
+                       };
+                       
+                       Policy catchRollback = new Policy() {
+                                       public Processor wrap(Processor 
processor) {
+                                               return new 
DelegateProcessor(processor) {
+                                               @Override
+                                               public void process(Object 
exchange) {
+                                                       try {
+                                                               
processNext(exchange);
+                                                       } catch ( Throwable e ) 
{
+                                                       }
+                                               }
+                                               @Override
+                                               public String toString() {
+                                               return "catchRollback(" + next 
+ ")";
+                                               }
+                                       };
+                                       }
                        };
-                                                       
-                       // Used to verify that transacted sends will succeed.
-                               
from("activemq:queue:mock.a").trace().to("mock:a");      // Used to validate 
messages are sent to the target.
+                       
+                               // NOTE: ErrorHandler has to be disabled since 
it operates within the failed transaction.
+                       inheritErrorHandler(false);                     
+                       // Used to validate messages are sent to the target.
+                               
from("activemq:queue:mock.a").trace().to("mock:a");
                        
                                // Receive from a and send to target in 1 tx.
-                       transactionPolicy("PROPAGATION_REQUIRED");
-                               
from("activemq:queue:a").trace().to("activemq:queue:mock.a");
+                               
from("activemq:queue:a").to("activemq:queue:mock.a");
                                
                                // Cause an error after processing the send.  
The send to activemq:queue:mock.a should rollback 
                                // since it is participating in the inbound 
transaction, but mock:b does not participate so we should see the message get
                                // there.  Also, expect multiple inbound 
retries as the message is rolled back.
-                               
from("activemq:queue:b").inheritErrorHandler(false).trace().intercept(rollback).to("activemq:queue:mock.a",
 "mock:b"); 
+                       //transactionPolicy(requried);
+                               
from("activemq:queue:b").policy(rollback).to("activemq:queue:mock.a", 
"mock:b"); 
+                               
+                               // Cause an error after processing the send in 
a new transaction.  The send to activemq:queue:mock.a should rollback 
+                               // since the rollback is within it's 
transaction, but mock:b does not participate so we should see the message get
+                               // there.  Also, expect the message to be 
successfully consumed since the rollback error is not propagated.
+                       //transactionPolicy(requried);
+                               
from("activemq:queue:c").policy(catchRollback).policy(requirenew).policy(rollback).to("activemq:queue:mock.a",
 "mock:b");
+                               
+                               // Cause an error after processing the send in 
without a transaction.  The send to activemq:queue:mock.a should succeed. 
+                               // Also, expect the message to be successfully 
consumed since the rollback error is not propagated.
+                       
from("activemq:queue:d").policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a");
 
 
+//                     JmsEndpoint endpoint = 
(JmsEndpoint)endpoint("activemq:queue:e");
+//                     
from(endpoint).policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a");
 
+                               
                        }
                };
        }
@@ -95,40 +142,62 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+        
+        for (Route route : this.context.getRoutes()) {
+               System.out.println(route);
+               }
+        
         mockEndpointA = (MockEndpoint) resolveMandatoryEndpoint("mock:a");
         mockEndpointB = (MockEndpoint) resolveMandatoryEndpoint("mock:b");
     }
+    
+    @Override
+    protected void tearDown() throws Exception {
+       super.tearDown();
+       spring.destroy();
+    }
 
-       public void testReuqiredSend() throws Exception {
+       public void testSenarioA() throws Exception {
                String expected = getName()+": "+System.currentTimeMillis();
         mockEndpointA.expectedBodiesReceived(expected);
         send("activemq:queue:a", expected);
         assertIsSatisfied(mockEndpointA);
        }
 
-       public void testRequiredSendAndRollback() throws Exception {
+       public void testSenarioB() throws Exception {
                String expected = getName()+": "+System.currentTimeMillis();
-        mockEndpointA.expectedMessageCount(0);
-        mockEndpointB.expectedMinimumMessageCount(5); // May be more since 
spring seems to go into tight loop redelivering.
+               mockEndpointA.expectedMessageCount(0);
+        mockEndpointB.expectedMinimumMessageCount(2); // May be more since 
spring seems to go into tight loop re-delivering.
         send("activemq:queue:b", expected);
+        assertIsSatisfied(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+       }
+
+       public void testSenarioC() throws Exception {
+               String expected = getName()+": "+System.currentTimeMillis();
+               mockEndpointA.expectedMessageCount(0);
+        mockEndpointB.expectedMessageCount(1); // Should only get 1 message 
the incoming transaction does not rollback.
+        send("activemq:queue:c", expected);
+
+        // Wait till the endpoints get their messages.
+        assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+
+        // Wait a little more to make sure extra messages are not received.
+        Thread.sleep(1000);
+        
         assertIsSatisfied(mockEndpointA,mockEndpointB);
-        int t = mockEndpointB.getReceivedCounter();
-        System.out.println("Actual Deliveries: "+t);
        }
 
-       /** 
-        * Validates that the send was done in a new transaction.  Message 
should be consumed from A,
-        * But
-        * 
-        * @throws Exception
-        */
-       public void xtestSendRequireNewAndRollack() throws Exception {
+       public void testSenarioD() throws Exception {
                String expected = getName()+": "+System.currentTimeMillis();
-        mockEndpointA.expectedMessageCount(0);
+               mockEndpointA.expectedMessageCount(1);
+        send("activemq:queue:d", expected);
 
-        send("activemq:queue:a", expected);
+        // Wait till the endpoints get their messages.
+        assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
 
+        // Wait a little more to make sure extra messages are not received.
+        Thread.sleep(1000);
+        
         assertIsSatisfied(mockEndpointA);
        }
-
 }

Added: 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml?view=auto&rev=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
 (added)
+++ 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
 Wed Apr 25 10:20:28 2007
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: xbean -->
+<beans>
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker useJmx="false"  xmlns="http://activemq.org/config/1.0"; 
persistent="false" brokerName="localhost">
+      
+  </broker>
+  
+</beans>
+<!-- END SNIPPET: xbean -->

Modified: 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
 (original)
+++ 
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
 Wed Apr 25 10:20:28 2007
@@ -25,8 +25,12 @@
     <property name="connectionFactory" ref="jmsConnectionFactory"/>
   </bean>
 
-  <bean id="jmsConnectionFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
-    <property name="brokerURL" 
value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
+  <bean id="jmsConnectionFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
+    <property name="brokerURL" value="vm://localhost"/>
+  </bean>
+
+  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
+    <property name="config" 
value="org/apache/camel/component/jms/activemq.xml"/>
   </bean>
 
 </beans>

Modified: 
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
 (original)
+++ 
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
 Wed Apr 25 10:20:28 2007
@@ -18,10 +18,19 @@
 package org.apache.camel.processor.idempotent.jpa;
 
 import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.springframework.orm.jpa.JpaCallback;
 import org.springframework.orm.jpa.JpaTemplate;
+import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
 
+import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
+import javax.persistence.PersistenceException;
+
 import java.util.List;
 
 /**
@@ -29,8 +38,9 @@
  */
 public class JpaMessageIdRepository implements MessageIdRepository {
     protected static final String QUERY_STRING = "select x from " + 
MessageProcessed.class.getName() + " x where x.processorName = ?1 and 
x.messageId = ?2";
-    private JpaTemplate template;
+    private JpaTemplate jpaTemplate;
     private String processorName;
+       private TransactionTemplate transactionTemplate;
 
     public static JpaMessageIdRepository jpaMessageIdRepository(String 
persistenceUnit, String processorName) {
         EntityManagerFactory entityManagerFactory = 
Persistence.createEntityManagerFactory(persistenceUnit);
@@ -42,22 +52,41 @@
     }
 
     public JpaMessageIdRepository(JpaTemplate template, String processorName) {
-        this.template = template;
+        this(template, createTransactionTemplate(template), processorName);
+    }
+
+    public JpaMessageIdRepository(JpaTemplate template, TransactionTemplate 
transactionTemplate, String processorName) {
+        this.jpaTemplate = template;
         this.processorName = processorName;
+        this.transactionTemplate=transactionTemplate;
+    }
+    
+    static private TransactionTemplate createTransactionTemplate(JpaTemplate 
jpaTemplate) {
+       TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new 
JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+        
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        return transactionTemplate;
     }
 
-    public boolean contains(String messageId) {
-        List list = template.find(QUERY_STRING, processorName, messageId);
-        if (list.isEmpty()) {
-            MessageProcessed processed = new MessageProcessed();
-            processed.setProcessorName(processorName);
-            processed.setMessageId(messageId);
-            template.persist(processed);
-            template.flush();
-            return false;
-        }
-        else {
-            return true;
-        }
+    public boolean contains(final String messageId) {
+       // Run this in single transaction.
+       Boolean rc = (Boolean) transactionTemplate.execute(new 
TransactionCallback(){
+                       public Object doInTransaction(TransactionStatus arg0) {
+                               
+                       List list = jpaTemplate.find(QUERY_STRING, 
processorName, messageId);
+                       if (list.isEmpty()) {
+                           MessageProcessed processed = new MessageProcessed();
+                           processed.setProcessorName(processorName);
+                           processed.setMessageId(messageId);
+                           jpaTemplate.persist(processed);
+                           jpaTemplate.flush();
+                           return Boolean.FALSE;
+                       }
+                       else {
+                           return Boolean.TRUE;
+                       }
+                       }
+               });
+       return rc.booleanValue();
     }
 }

Modified: 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
 (original)
+++ 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
 Wed Apr 25 10:20:28 2007
@@ -20,9 +20,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.spring.spi.SpringTransactionPolicy;
 import org.springframework.context.ApplicationContext;
-import org.springframework.transaction.support.TransactionTemplate;
 
 /**
  * An extension of the [EMAIL PROTECTED] RouteBuilder} to provide some 
additional helper methods
@@ -31,20 +29,6 @@
  */
 public abstract class SpringRouteBuilder<E extends Exchange> extends 
RouteBuilder<E> {
     private ApplicationContext applicationContext;
-
-    /**
-     * Configures a transaction interceptor on routes created by this builder 
using the named spring bean
-     * for the [EMAIL PROTECTED] TransactionTemplate} to use for the 
transaction
-     *
-     * @param transactionTemplateName the name of the spring bean in the 
application context which is the
-     *                                [EMAIL PROTECTED] TransactionTemplate} 
to use
-     * @return this builder
-     */
-    public SpringRouteBuilder<E> transactionPolicy(String 
transactionTemplateName) {
-        TransactionTemplate template = bean(TransactionTemplate.class, 
transactionTemplateName);
-        setTransactionPolicy(new SpringTransactionPolicy(template));
-        return this;
-    }
 
     /**
      * Looks up the bean with the given name in the application context and 
returns it, or throws an exception if the

Modified: 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
 (original)
+++ 
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
 Wed Apr 25 10:20:28 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.spi.Policy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
 import 
org.springframework.transaction.support.TransactionCallbackWithoutResult;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -62,8 +63,28 @@
 
             @Override
             public String toString() {
-                return "SpringTransactionPolicy[" + getNext() + "]";
+                return 
"SpringTransactionPolicy:"+propagationBehaviorToString(transactionTemplate.getPropagationBehavior())+"["
 + getNext() + "]";
             }
+
+                       private String propagationBehaviorToString(int 
propagationBehavior) {
+                               switch( propagationBehavior ) {
+                               case 
TransactionDefinition.PROPAGATION_MANDATORY:
+                                       return "PROPAGATION_MANDATORY";
+                               case TransactionDefinition.PROPAGATION_NESTED:
+                                       return "PROPAGATION_NESTED";
+                               case TransactionDefinition.PROPAGATION_NEVER:
+                                       return "PROPAGATION_NEVER";
+                               case 
TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
+                                       return "PROPAGATION_NOT_SUPPORTED";
+                               case TransactionDefinition.PROPAGATION_REQUIRED:
+                                       return "PROPAGATION_REQUIRED";
+                               case 
TransactionDefinition.PROPAGATION_REQUIRES_NEW:
+                                       return "PROPAGATION_REQUIRES_NEW";
+                               case TransactionDefinition.PROPAGATION_SUPPORTS:
+                                       return "PROPAGATION_SUPPORTS";
+                               }
+                               return "UNKOWN";
+                       }
         };
     }
 


Reply via email to