Author: davsclaus
Date: Wed May 11 14:00:52 2011
New Revision: 1101878
URL: http://svn.apache.org/viewvc?rev=1101878&view=rev
Log:
Added test for XA with JMS and JDBC idempotent consumer EIP based on user issue.
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
Modified:
camel/trunk/tests/camel-itest/pom.xml
Modified: camel/trunk/tests/camel-itest/pom.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/pom.xml?rev=1101878&r1=1101877&r2=1101878&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest/pom.xml (original)
+++ camel/trunk/tests/camel-itest/pom.xml Wed May 11 14:00:52 2011
@@ -38,6 +38,10 @@
<name>Public online Restlet repository</name>
<url>http://maven.restlet.org</url>
</repository>
+ <repository>
+ <id>atomikos</id>
+ <url>http://repo.atomikos.com</url>
+ </repository>
</repositories>
<dependencies>
@@ -134,6 +138,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-sql</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-mail</artifactId>
<scope>test</scope>
</dependency>
@@ -282,6 +291,35 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+
+ <!-- atomikos XA TX manager -->
+ <dependency>
+ <groupId>com.atomikos</groupId>
+ <artifactId>transactions</artifactId>
+ <version>3.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.atomikos</groupId>
+ <artifactId>transactions-jta</artifactId>
+ <version>3.6.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.atomikos</groupId>
+ <artifactId>transactions-jms</artifactId>
+ <version>3.6.2</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.atomikos</groupId>
+ <artifactId>transactions-jdbc</artifactId>
+ <version>3.6.2</version>
+ </dependency>
+
</dependencies>
<build>
@@ -293,6 +331,7 @@
<excludes>
<!-- TODO FIXME ASAP -->
<exclude>**/XXXTest.*</exclude>
+
<exclude>**/FromJmsToJdbcIdempotentConsumerToJmsTest.*</exclude>
</excludes>
<systemProperties>
<property>
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java?rev=1101878&view=auto
==============================================================================
---
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
(added)
+++
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.java
Wed May 11 14:00:52 2011
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.sql;
+
+import java.io.File;
+import java.net.ConnectException;
+import javax.sql.DataSource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.apache.camel.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+/**
+ * Jms with JDBC idempotent consumer using XA test.
+ */
+public class FromJmsToJdbcIdempotentConsumerToJmsTest extends
CamelSpringTestSupport {
+
+ protected JdbcTemplate jdbcTemplate;
+ protected DataSource dataSource;
+ protected IdempotentRepository repository;
+
+ @Override
+ protected AbstractApplicationContext createApplicationContext() {
+ return new
ClassPathXmlApplicationContext("org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml");
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ // delete transaction log and AMQ data
+ FileUtil.deleteFile(new File("tm.out"));
+ FileUtil.deleteFile(new File("tmlog0.log"));
+ deleteDirectory("activemq-data");
+
+ super.setUp();
+
+ dataSource = context.getRegistry().lookup("myNonXADataSource",
DataSource.class);
+ jdbcTemplate = new JdbcTemplate(dataSource);
+ jdbcTemplate.afterPropertiesSet();
+
+ setupRepository();
+ }
+
+ protected void setupRepository() {
+ try {
+ jdbcTemplate.execute("DROP TABLE CAMEL_MESSAGEPROCESSED");
+ } catch (Exception e) {
+ // ignore
+ }
+ jdbcTemplate.execute("CREATE TABLE CAMEL_MESSAGEPROCESSED
(processorName VARCHAR(20), messageId VARCHAR(10))");
+ }
+
+ @Test
+ public void testJmsToJdbcJmsCommit() throws Exception {
+ // check there are no messages in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
2000));
+
+ // use a notify to know when the message is done
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+ // use mock during testing as well
+ getMockEndpoint("mock:a").expectedMessageCount(1);
+ getMockEndpoint("mock:b").expectedMessageCount(1);
+
+ template.sendBodyAndHeader("activemq:queue:inbox", "A", "uid", 123);
+
+ // assert mock and wait for the message to be done
+ assertMockEndpointsSatisfied();
+ assertTrue("Should complete 1 message", notify.matchesMockWaitTime());
+
+ // check that there is a message in the database and JMS queue
+ assertEquals(1, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ Object out = consumer.receiveBody("activemq:queue:outbox", 3000);
+ assertEquals("DONE-A", out);
+ }
+
+ @Test
+ public void testJmsToJdbcJmsRollbackAtA() throws Exception {
+ // check there are no messages in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
2000));
+
+ // use a notify to know that after 1+6 (1 original + 6 redelivery)
attempts from AcitveMQ
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
+ getMockEndpoint("mock:a").expectedMessageCount(7);
+ // force exception to occur at mock a
+ getMockEndpoint("mock:a").whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new ConnectException("Forced cannot connect to
database");
+ }
+ });
+ getMockEndpoint("mock:b").expectedMessageCount(0);
+
+ template.sendBodyAndHeader("activemq:queue:inbox", "A", "uid", 123);
+
+ // assert mock and wait for the message to be done
+ assertMockEndpointsSatisfied();
+ assertTrue("Should complete 7 message", notify.matchesMockWaitTime());
+
+ // check that there is a message in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
3000));
+
+ // the message should have been moved to the AMQ DLQ queue
+ assertEquals("A", consumer.receiveBody("activemq:queue:ActiveMQ.DLQ",
3000));
+ }
+
+ @Test
+ public void testJmsToJdbcJmsRollbackAtB() throws Exception {
+ // check there are no messages in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
2000));
+
+ // use a notify to know that after 1+6 (1 original + 6 redelivery)
attempts from AcitveMQ
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(7).create();
+
+ getMockEndpoint("mock:a").expectedMessageCount(7);
+ // force exception to occur at mock a
+ getMockEndpoint("mock:b").whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new ConnectException("Forced cannot send to AMQ queue");
+ }
+ });
+
+ template.sendBodyAndHeader("activemq:queue:inbox", "B", "uid", 456);
+
+ // assert mock and wait for the message to be done
+ assertMockEndpointsSatisfied();
+ assertTrue("Should complete 7 messages", notify.matchesMockWaitTime());
+
+ // check that there is a message in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
3000));
+
+ // the message should have been moved to the AMQ DLQ queue
+ assertEquals("B", consumer.receiveBody("activemq:queue:ActiveMQ.DLQ",
3000));
+ }
+
+ @Test
+ public void testFilterIdempotent() throws Exception {
+ // check there are no messages in the database and JMS queue
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals(null, consumer.receiveBody("activemq:queue:outbox",
2000));
+
+ // use a notify to know when the message is done
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(3).create();
+
+ // use mock during testing as well
+ getMockEndpoint("mock:a").expectedMessageCount(3);
+ // there should be 1 duplicate
+ getMockEndpoint("mock:b").expectedMessageCount(2);
+
+ template.sendBodyAndHeader("activemq:queue:inbox", "D", "uid", 111);
+ template.sendBodyAndHeader("activemq:queue:inbox", "E", "uid", 222);
+ template.sendBodyAndHeader("activemq:queue:inbox", "D", "uid", 111);
+
+ // assert mock and wait for the message to be done
+ assertMockEndpointsSatisfied();
+ assertTrue("Should complete 3 messages", notify.matchesMockWaitTime());
+
+ // check that there is two messages in the database and JMS queue
+ assertEquals(2, jdbcTemplate.queryForInt("select count(*) from
CAMEL_MESSAGEPROCESSED"));
+ assertEquals("DONE-D", consumer.receiveBody("activemq:queue:outbox",
3000));
+ assertEquals("DONE-E", consumer.receiveBody("activemq:queue:outbox",
3000));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ repository =
context.getRegistry().lookup("messageIdRepository", IdempotentRepository.class);
+
+ from("activemq:queue:inbox")
+ .transacted("required")
+ .to("mock:a")
+ .idempotentConsumer(header("uid"), repository)
+ .to("mock:b")
+ .transform(simple("DONE-${body}"))
+ .to("activemq:queue:outbox");
+ }
+ };
+ }
+}
Added:
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
URL:
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml?rev=1101878&view=auto
==============================================================================
---
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
(added)
+++
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/sql/FromJmsToJdbcIdempotentConsumerToJmsTest.xml
Wed May 11 14:00:52 2011
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:camel="http://camel.apache.org/schema/spring"
+ xmlns:broker="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
+
+ <!-- jdbc idempotent repository -->
+ <bean id="messageIdRepository"
class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
+ <constructor-arg index="0" ref="myDataSource"/>
+ <constructor-arg index="1" ref="requiredTemplate"/>
+ <constructor-arg index="2" value="myProcessor"/>
+ </bean>
+
+ <!-- use required TX -->
+ <bean id="requiredTemplate"
class="org.springframework.transaction.support.TransactionTemplate">
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ </bean>
+ <bean id="required"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
+ <property name="transactionTemplate" ref="requiredTemplate"/>
+ </bean>
+
+ <!-- setup Atomikos for XA transaction -->
+ <bean id="atomikosTransactionManager"
+ class="com.atomikos.icatch.jta.UserTransactionManager"
+ init-method="init" destroy-method="close" depends-on="my-broker">
+ <!-- when close is called, should we force transactions to terminate
or not? -->
+ <property name="forceShutdown" value="false"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="atomikosUserTransaction"
class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="my-broker">
+ <property name="transactionTimeout" value="300"/>
+ </bean>
+
+ <!-- this is some atomikos setup you must do -->
+ <bean id="connectionFactory"
+ class="com.atomikos.jms.AtomikosConnectionFactoryBean"
depends-on="my-broker">
+ <property name="uniqueResourceName" value="myUniqueResource"/>
+ <property name="xaConnectionFactory" ref="jmsXaConnectionFactory"/>
+ </bean>
+
+ <!-- this is the Spring JtaTransactionManager which under the hood uses
Atomikos -->
+ <bean id="jtaTransactionManager"
+ class="org.springframework.transaction.jta.JtaTransactionManager"
depends-on="my-broker">
+ <property name="transactionManager" ref="atomikosTransactionManager"/>
+ <property name="userTransaction" ref="atomikosUserTransaction"/>
+ </bean>
+
+ <!-- Is the ConnectionFactory to connect to the JMS broker -->
+ <!-- notice how we must use the XA connection factory -->
+ <bean id="jmsXaConnectionFactory"
class="org.apache.activemq.ActiveMQXAConnectionFactory" depends-on="my-broker">
+ <property name="brokerURL" value="tcp://localhost:61616"/>
+ </bean>
+
+ <!-- define the activemq Camel component so we can integrate with the AMQ
broker below -->
+ <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent"
depends-on="my-broker">
+ <!-- must indicate that we use transacted acknowledge mode -->
+ <property name="transacted" value="true"/>
+ <!-- refer to the transaction manager -->
+ <property name="transactionManager" ref="jtaTransactionManager"/>
+ </bean>
+
+ <!-- setup a local JMS Broker for testing purpose -->
+ <broker:broker id="my-broker" useJmx="false" persistent="false"
brokerName="localhost">
+ <broker:transportConnectors>
+ <broker:transportConnector uri="tcp://localhost:61616"/>
+ </broker:transportConnectors>
+ </broker:broker>
+
+ <!-- define the datasource to the database - in this example we use an in
memory database using HSQLDB -->
+ <!-- HSQLDB is not XA compatible so we wrap that using a special Atomikos
NonXA to XA DataSource -->
+ <bean id="myDataSource"
class="com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean">
+ <property name="uniqueResourceName" value="hsqldb"/>
+ <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+ <property name="url" value="jdbc:hsqldb:mem:mydatabase"/>
+ <property name="user" value="sa"/>
+ <property name="password" value=""/>
+ <property name="poolSize" value="3"/>
+ </bean>
+
+ <!-- datasource used to create the database tables -->
+ <bean id="myNonXADataSource"
class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
+ <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+ <property name="url" value="jdbc:hsqldb:mem:mydatabase"/>
+ <property name="username" value="sa"/>
+ <property name="password" value=""/>
+ </bean>
+
+</beans>
+