Repository: activemq Updated Branches: refs/heads/trunk 1999ddfd0 -> 10394734f
add a sanity external tm xa rollback test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/10394734 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/10394734 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/10394734 Branch: refs/heads/trunk Commit: 10394734f67069db813669c92735ee7cfd927bbe Parents: 1999ddf Author: gtully <[email protected]> Authored: Tue Apr 1 22:44:54 2014 +0100 Committer: gtully <[email protected]> Committed: Tue Apr 1 22:45:45 2014 +0100 ---------------------------------------------------------------------- .../activemq/camel/JmsJdbcXARollbackTest.java | 185 +++++++++++++++++++ .../apache/activemq/camel/jmsXajdbcRollback.xml | 120 ++++++++++++ 2 files changed, 305 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/10394734/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXARollbackTest.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXARollbackTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXARollbackTest.java new file mode 100644 index 0000000..16bc006 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/JmsJdbcXARollbackTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel; + +import java.sql.ResultSet; +import java.sql.SQLException; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.transaction.TransactionManager; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.apache.camel.Exchange; +import org.apache.camel.component.jms.JmsMessage; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.apache.commons.dbcp.BasicDataSource; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.transaction.jta.JtaTransactionManager; + +/** + * shows rollback and redelivery dlq respected with external tm + */ +public class JmsJdbcXARollbackTest extends CamelSpringTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXARollbackTest.class); + BrokerService broker = null; + int messageCount; + + public java.sql.Connection initDb() throws Exception { + String createStatement = + "CREATE TABLE SCP_INPUT_MESSAGES (" + + "id int NOT NULL GENERATED ALWAYS AS IDENTITY, " + + "messageId varchar(96) NOT NULL, " + + "messageCorrelationId varchar(96) NOT NULL, " + + "messageContent varchar(2048) NOT NULL, " + + "PRIMARY KEY (id) )"; + + java.sql.Connection conn = getJDBCConnection(); + try { + conn.createStatement().execute(createStatement); + } catch (SQLException alreadyExists) { + log.info("ex on create tables", alreadyExists); + } + + try { + conn.createStatement().execute("DELETE FROM SCP_INPUT_MESSAGES"); + } catch (SQLException ex) { + log.info("ex on create delete all", ex); + } + + return conn; + } + + private java.sql.Connection getJDBCConnection() throws Exception { + BasicDataSource dataSource = getMandatoryBean(BasicDataSource.class, "managedDataSourceWithRecovery"); + return dataSource.getConnection(); + } + + private int dumpDb(java.sql.Connection jdbcConn) throws Exception { + int count = 0; + ResultSet resultSet = jdbcConn.createStatement().executeQuery("SELECT * FROM SCP_INPUT_MESSAGES"); + while (resultSet.next()) { + count++; + log.info("message - seq:" + resultSet.getInt(1) + + ", id: " + resultSet.getString(2) + + ", corr: " + resultSet.getString(3) + + ", content: " + resultSet.getString(4)); + } + return count; + } + + @Test + public void testConsumeRollback() throws Exception { + java.sql.Connection jdbcConn = initDb(); + + initTMRef(); + sendJMSMessageToKickOffRoute(); + + // should go to dlq eventually + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return consumedFrom(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME); + } + }); + assertEquals("message in db, commit to db worked", 0, dumpDb(jdbcConn)); + assertFalse("Nothing to to out q", consumedFrom("scp_transacted_out")); + + } + + private boolean consumedFrom(String qName) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA"); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(qName)); + Message message = consumer.receive(500); + LOG.info("Got from queue:{} {}", qName, message); + connection.close(); + return message != null; + } + + static TransactionManager[] transactionManager = new TransactionManager[1]; + private void initTMRef() { + transactionManager[0] = getMandatoryBean(JtaTransactionManager.class, "jtaTransactionManager").getTransactionManager(); + + } + + private void sendJMSMessageToKickOffRoute() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://testXA"); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue("scp_transacted")); + TextMessage message = session.createTextMessage("Some Text, messageCount:" + messageCount++); + message.setJMSCorrelationID("pleaseCorrelate"); + producer.send(message); + connection.close(); + } + + private BrokerService createBroker(boolean deleteAllMessages) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + brokerService.setBrokerName("testXA"); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(false); + brokerService.setDataDirectory("target/data"); + brokerService.addConnector("tcp://0.0.0.0:61616"); + return brokerService; + } + + @SuppressWarnings("unchecked") + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + + deleteDirectory("target/data/howl"); + + // make broker available to recovery processing on app context start + try { + broker = createBroker(true); + broker.start(); + } catch (Exception e) { + throw new RuntimeException("Failed to start broker", e); + } + + return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbcRollback.xml"); + } + + public static class MarkRollbackOnly { + public String enrich(Exchange exchange) throws Exception { + LOG.info("Got exchange: " + exchange); + LOG.info("Got message: " + ((JmsMessage)exchange.getIn()).getJmsMessage()); + + LOG.info("Current tx: " + transactionManager[0].getTransaction()); + LOG.info("Marking rollback only..."); + transactionManager[0].getTransaction().setRollbackOnly(); + return "Some Text"; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/10394734/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbcRollback.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbcRollback.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbcRollback.xml new file mode 100644 index 0000000..f5b4306 --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/jmsXajdbcRollback.xml @@ -0,0 +1,120 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- START SNIPPET: jms_jdbc_xa --> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:context="http://www.springframework.org/schema/context" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + "> + + <context:annotation-config /> + <!-- broker creation in code so it can be restarted and modified to test recovery --> + + <!-- use jencks factory beans to easily configure howl and geronimo transaction manager --> + <bean id="xidFactory" class="org.apache.geronimo.transaction.manager.XidFactoryImpl"/> + <!-- Transaction log --> + <bean id="transactionLog" class="org.jencks.factory.HowlLogFactoryBean"> + <property name="logFileDir" value="target/data/howl/txlog"/> + <property name="xidFactory" ref="xidFactory"/> + </bean> + <bean id="jenckTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean"> + <property name="transactionLog" ref="transactionLog"/> + </bean> + + <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> + <property name="brokerURL" value="tcp://localhost:61616?jms.dispatchAsync=false&jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=100"/> + </bean> + + <!-- register ActiveMQ with Geronimo to allow out of band transaction recovery/completion on a new connection + the resourceName gives the ActiveMQ XAResource an identity, Geronimo NamedXAResource in the transaction log + --> + <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> + <property name="transactionManager" ref="jenckTransactionManager"/> + <property name="connectionFactory" ref="activemqConnectionFactory"/> + <property name="resourceName" value="activemq.broker"/> + </bean> + + <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean" + depends-on="jenckTransactionManager"> + <property name="maxConnections" value="1"/> + <property name="transactionManager" ref="jenckTransactionManager"/> + <property name="connectionFactory" ref="activemqConnectionFactory"/> + <property name="resourceName" value="activemq.broker"/> + </bean> + + <!-- Configure the Spring framework (used by camel) to use JTA transactions from Geronimo --> + <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> + <property name="transactionManager" ref="jenckTransactionManager"/> + </bean> + + <!-- Define the activemq Camel component so we can integrate with the AMQ broker --> + <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" + depends-on="pooledConnectionFactory"> + <property name="transacted" value="true"/> + <property name="transactionManager" ref="jtaTransactionManager"/> + <property name="connectionFactory" ref="pooledConnectionFactory"/> + <!-- cache level is important, can be cache connection or none, as session needs to be enlisted + in the current transaction they can't be cached, with default cache sessions, they are created + up front, before the transaction (required for the route) --> + <property name="cacheLevel" value="0"/> + </bean> + + <!-- openejb provides geronimo NamedXAResources wrapper around commons dbcp such that they have an identity in the howl log --> + <bean id="geronimoXAResourceWrapper" + class="org.apache.openejb.resource.GeronimoTransactionManagerFactory.GeronimoXAResourceWrapper"/> + <bean id="managedDataSourceWithRecovery" class="org.apache.openejb.resource.jdbc.ManagedDataSourceWithRecovery"> + <constructor-arg> + <ref bean="geronimoXAResourceWrapper"></ref> + </constructor-arg> + <property name="jdbcDriver" value="org.apache.derby.jdbc.EmbeddedDriver"/> + <property name="jdbcUrl" value="jdbc:derby:target/XatestDs;create=true"/> + <property name="transactionManager" ref="jenckTransactionManager"/> + </bean> + + <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> + <property name="transactionManager" ref="jenckTransactionManager"/> + <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> + </bean> + + <bean id="markRollback" class="org.apache.activemq.camel.JmsJdbcXARollbackTest.MarkRollbackOnly"/> + + <!-- the route, from jms to jdbc in an xa transaction --> + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> + <route id="queueToDbTransacted"> + <from uri="activemq:queue:scp_transacted"/> + <transacted ref="required"/> + <bean ref="markRollback" method="enrich" /> + <convertBodyTo type="java.lang.String"/> + <to uri="log:BeforeSettingBody?showAll=true"/> + <to uri="activemq:queue:scp_transacted_out"/> + <setBody> + <simple>INSERT INTO SCP_INPUT_MESSAGES(messageId, messageCorrelationId, messageContent) + VALUES('${in.header.JMSMessageId}','${in.header.JMSCorrelationId}','${in.body}') + </simple> + </setBody> + <to uri="jdbc:managedDataSourceWithRecovery?resetAutoCommit=false"/> + + </route> + </camelContext> + +</beans> +<!-- END SNIPPET: jms_jdbc_xa -->
