Author: davsclaus
Date: Wed May 11 14:00:52 2011
New Revision: 1101878

URL: http://svn.apache.org/viewvc?rev=1101878&view=rev
Log:
Added test for XA with JMS and JDBC idempotent consumer EIP based on user issue.

Added:
    camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/
    
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
    camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/
    
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
Modified:
    camel/trunk/tests/camel-itest/pom.xml

Modified: camel/trunk/tests/camel-itest/pom.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/pom.xml?rev=1101878&r1=1101877&r2=1101878&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest/pom.xml (original)
+++ camel/trunk/tests/camel-itest/pom.xml Wed May 11 14:00:52 2011
@@ -38,6 +38,10 @@
             <name>Public online Restlet repository</name>
             <url>http://maven.restlet.org</url>
         </repository>
+         <repository>
+             <id>atomikos</id>
+             <url>http://repo.atomikos.com</url>
+         </repository>
     </repositories>
 
     <dependencies>
@@ -134,6 +138,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-sql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-mail</artifactId>
             <scope>test</scope>
         </dependency>
@@ -282,6 +291,35 @@
                <classifier>tests</classifier>
                <scope>test</scope>
        </dependency>
+
+        <!-- atomikos XA TX manager -->
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-jta</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-jms</artifactId>
+            <version>3.6.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-jdbc</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -293,6 +331,7 @@
                     <excludes>
                       <!-- TODO FIXME ASAP -->
                       <exclude>**/XXXTest.*</exclude>
+                      
<exclude>**/FromJmsToJdbcIdempotentConsumerToJmsTest.*</exclude>
                     </excludes>
                     <systemProperties>
                                        <property>

Added: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java?rev=1101878&view=auto
==============================================================================
--- 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
 (added)
+++ 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
 Wed May 11 14:00:52 2011
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.sql;
+
+import java.io.File;
+import java.net.ConnectException;
+import javax.sql.DataSource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.apache.camel.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Jms with JDBC idempotent consumer using XA test.
+ */
+public class FromJmsToJdbcIdempotentConsumerToJmsTest extends 
CamelSpringTestSupport {
+
+    protected JdbcTemplate jdbcTemplate;
+    protected DataSource dataSource;
+    protected IdempotentRepository repository;
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml");
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        // delete transaction log and AMQ data
+        FileUtil.deleteFile(new File("tm.out"));
+        FileUtil.deleteFile(new File("tmlog0.log"));
+        deleteDirectory("activemq-data");
+
+        super.setUp();
+
+        dataSource = context.getRegistry().lookup("myNonXADataSource", 
DataSource.class);
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.afterPropertiesSet();
+
+        setupRepository();
+    }
+
+    protected void setupRepository() {
+        try {
+            jdbcTemplate.execute("DROP TABLE CAMEL_MESSAGEPROCESSED");
+        } catch (Exception e) {
+            // ignore
+        }
+        jdbcTemplate.execute("CREATE TABLE CAMEL_MESSAGEPROCESSED 
(processorName VARCHAR(20), messageId VARCHAR(10))");
+    }
+
+    @Test
+    public void testJmsToJdbcJmsCommit() throws Exception {
+        // check there are no messages in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
2000));
+
+        // use a notify to know when the message is done
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+        // use mock during testing as well
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("activemq:queue:inbox", "A", "uid", 123);
+
+        // assert mock and wait for the message to be done
+        assertMockEndpointsSatisfied();
+        assertTrue("Should complete 1 message", notify.matchesMockWaitTime());
+
+        // check that there is a message in the database and JMS queue
+        assertEquals(1, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        Object out = consumer.receiveBody("activemq:queue:outbox", 3000);
+        assertEquals("DONE-A", out);
+    }
+
+    @Test
+    public void testJmsToJdbcJmsRollbackAtA() throws Exception {
+        // check there are no messages in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
2000));
+
+        // use a notify to know that after 1+6 (1 original + 6 redelivery) 
attempts from AcitveMQ
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
+        getMockEndpoint("mock:a").expectedMessageCount(7);
+        // force exception to occur at mock a
+        getMockEndpoint("mock:a").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new ConnectException("Forced cannot connect to 
database");
+            }
+        });
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("activemq:queue:inbox", "A", "uid", 123);
+
+        // assert mock and wait for the message to be done
+        assertMockEndpointsSatisfied();
+        assertTrue("Should complete 7 message", notify.matchesMockWaitTime());
+
+        // check that there is a message in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
3000));
+
+        // the message should have been moved to the AMQ DLQ queue
+        assertEquals("A", consumer.receiveBody("activemq:queue:ActiveMQ.DLQ", 
3000));
+    }
+
+    @Test
+    public void testJmsToJdbcJmsRollbackAtB() throws Exception {
+        // check there are no messages in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
2000));
+
+        // use a notify to know that after 1+6 (1 original + 6 redelivery) 
attempts from AcitveMQ
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
+        getMockEndpoint("mock:a").expectedMessageCount(7);
+        // force exception to occur at mock a
+        getMockEndpoint("mock:b").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new ConnectException("Forced cannot send to AMQ queue");
+            }
+        });
+
+        template.sendBodyAndHeader("activemq:queue:inbox", "B", "uid", 456);
+
+        // assert mock and wait for the message to be done
+        assertMockEndpointsSatisfied();
+        assertTrue("Should complete 7 messages", notify.matchesMockWaitTime());
+
+        // check that there is a message in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
3000));
+
+        // the message should have been moved to the AMQ DLQ queue
+        assertEquals("B", consumer.receiveBody("activemq:queue:ActiveMQ.DLQ", 
3000));
+    }
+
+    @Test
+    public void testFilterIdempotent() throws Exception {
+        // check there are no messages in the database and JMS queue
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals(null, consumer.receiveBody("activemq:queue:outbox", 
2000));
+
+        // use a notify to know when the message is done
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
+        // use mock during testing as well
+        getMockEndpoint("mock:a").expectedMessageCount(3);
+        // there should be 1 duplicate
+        getMockEndpoint("mock:b").expectedMessageCount(2);
+
+        template.sendBodyAndHeader("activemq:queue:inbox", "D", "uid", 111);
+        template.sendBodyAndHeader("activemq:queue:inbox", "E", "uid", 222);
+        template.sendBodyAndHeader("activemq:queue:inbox", "D", "uid", 111);
+
+        // assert mock and wait for the message to be done
+        assertMockEndpointsSatisfied();
+        assertTrue("Should complete 3 messages", notify.matchesMockWaitTime());
+
+        // check that there is two messages in the database and JMS queue
+        assertEquals(2, jdbcTemplate.queryForInt("select count(*) from  
CAMEL_MESSAGEPROCESSED"));
+        assertEquals("DONE-D", consumer.receiveBody("activemq:queue:outbox", 
3000));
+        assertEquals("DONE-E", consumer.receiveBody("activemq:queue:outbox", 
3000));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                repository = 
context.getRegistry().lookup("messageIdRepository", IdempotentRepository.class);
+
+                from("activemq:queue:inbox")
+                    .transacted("required")
+                    .to("mock:a")
+                    .idempotentConsumer(header("uid"), repository)
+                    .to("mock:b")
+                    .transform(simple("DONE-${body}"))
+                    .to("activemq:queue:outbox");
+            }
+        };
+    }
+}

Added: 
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml?rev=1101878&view=auto
==============================================================================
--- 
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
 (added)
+++ 
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
 Wed May 11 14:00:52 2011
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:camel="http://camel.apache.org/schema/spring";
+       xmlns:broker="http://activemq.apache.org/schema/core";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+           http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+           http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+           http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd";>
+
+    <!-- jdbc idempotent repository -->
+    <bean id="messageIdRepository" 
class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
+        <constructor-arg index="0" ref="myDataSource"/>
+        <constructor-arg index="1" ref="requiredTemplate"/>
+        <constructor-arg index="2" value="myProcessor"/>
+    </bean>
+
+    <!-- use required TX -->
+    <bean id="requiredTemplate" 
class="org.springframework.transaction.support.TransactionTemplate">
+        <property name="transactionManager" ref="jtaTransactionManager"/>
+    </bean>
+    <bean id="required" 
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+        <property name="transactionTemplate" ref="requiredTemplate"/>
+    </bean>
+
+    <!-- setup Atomikos for XA transaction -->
+    <bean id="atomikosTransactionManager"
+          class="com.atomikos.icatch.jta.UserTransactionManager"
+          init-method="init" destroy-method="close" depends-on="my-broker">
+        <!-- when close is called, should we force transactions to terminate 
or not? -->
+        <property name="forceShutdown" value="false"/>
+    </bean>
+
+    <!-- this is some atomikos setup you must do -->
+    <bean id="atomikosUserTransaction" 
class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+        <property name="transactionTimeout" value="300"/>
+    </bean>
+
+    <!-- this is some atomikos setup you must do -->
+    <bean id="connectionFactory"
+          class="com.atomikos.jms.AtomikosConnectionFactoryBean" 
depends-on="my-broker">
+        <property name="uniqueResourceName" value="myUniqueResource"/>
+        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+    </bean>
+
+    <!-- this is the Spring JtaTransactionManager which under the hood uses 
Atomikos -->
+    <bean id="jtaTransactionManager"
+          class="org.springframework.transaction.jta.JtaTransactionManager" 
depends-on="my-broker">
+        <property name="transactionManager" ref="atomikosTransactionManager"/>
+        <property name="userTransaction" ref="atomikosUserTransaction"/>
+    </bean>
+
+    <!-- Is the ConnectionFactory to connect to the JMS broker -->
+    <!-- notice how we must use the XA connection factory -->
+    <bean id="jmsXaConnectionFactory" 
class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+        <property name="brokerURL" value="tcp://localhost:61616"/>
+    </bean>
+
+    <!-- define the activemq Camel component so we can integrate with the AMQ 
broker below -->
+    <bean id="activemq" 
class="org.apache.activemq.camel.component.ActiveMQComponent" 
depends-on="my-broker">
+        <!-- must indicate that we use transacted acknowledge mode -->
+        <property name="transacted" value="true"/>
+        <!-- refer to the transaction manager -->
+        <property name="transactionManager" ref="jtaTransactionManager"/>
+    </bean>
+
+    <!-- setup a local JMS Broker for testing purpose -->
+    <broker:broker id="my-broker" useJmx="false" persistent="false" 
brokerName="localhost">
+        <broker:transportConnectors>
+            <broker:transportConnector uri="tcp://localhost:61616"/>
+        </broker:transportConnectors>
+    </broker:broker>
+
+    <!-- define the datasource to the database - in this example we use an in 
memory database using HSQLDB -->
+    <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos 
NonXA to XA DataSource -->
+    <bean id="myDataSource" 
class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
+        <property name="uniqueResourceName" value="hsqldb"/>
+        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+        <property name="url" value="jdbc:hsqldb:mem:mydatabase"/>
+        <property name="user" value="sa"/>
+        <property name="password" value=""/>
+        <property name="poolSize" value="3"/>
+    </bean>
+
+    <!-- datasource used to create the database tables -->
+    <bean id="myNonXADataSource" 
class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
+        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+        <property name="url" value="jdbc:hsqldb:mem:mydatabase"/>
+        <property name="username" value="sa"/>
+        <property name="password" value=""/>
+    </bean>
+
+</beans>
+


Reply via email to