Repository: cxf Updated Branches: refs/heads/master 0918a1bc9 -> 0739807fb
CXF-5543 Adding experimental XA transaction support Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0739807f Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0739807f Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0739807f Branch: refs/heads/master Commit: 0739807fbc6f3b3e24aca2b517e34f1b9354d34e Parents: 0918a1b Author: Christian Schneider <[email protected]> Authored: Wed Apr 2 21:34:10 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Wed Apr 2 21:34:10 2014 +0200 ---------------------------------------------------------------------- rt/transports/jms/pom.xml | 6 + .../cxf/transport/jms/JMSDestination.java | 10 +- .../jms/util/JMSListenerContainer.java | 3 - .../jms/util/MessageListenerContainer.java | 119 ++++++++++-- .../transport/jms/util/MessageListenerTest.java | 191 +++++++++++++++++++ 5 files changed, 303 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/pom.xml ---------------------------------------------------------------------- diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml index e2ff1e0..e2224f6 100644 --- a/rt/transports/jms/pom.xml +++ b/rt/transports/jms/pom.xml @@ -119,6 +119,12 @@ <artifactId>slf4j-jdk14</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.aries.transaction</groupId> + <artifactId>org.apache.aries.transaction.manager</artifactId> + <version>1.1.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 76f857d..7c529ef 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -21,6 +21,7 @@ package org.apache.cxf.transport.jms; import java.io.UnsupportedEncodingException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -62,6 +63,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess private JMSListenerContainer jmsListener; private ThrottlingCounter suspendedContinuations; private ClassLoader loader; + private Connection connection; public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) { super(b, getTargetReference(info, b), info); @@ -77,7 +79,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess * @return the inbuilt backchannel */ protected Conduit getInbuiltBackChannel(Message inMessage) { - return new BackChannelConduit(inMessage, jmsConfig, jmsListener.getConnection()); + return new BackChannelConduit(inMessage, jmsConfig, connection); } /** @@ -98,13 +100,15 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess private JMSListenerContainer createTargetDestinationListener() { Session session = null; try { - Connection connection = JMSFactory.createConnection(jmsConfig); + connection = JMSFactory.createConnection(jmsConfig); connection.start(); session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE); Destination destination = jmsConfig.getTargetDestination(session); MessageListenerContainer container = new MessageListenerContainer(connection, destination, this); container.setMessageSelector(jmsConfig.getMessageSelector()); - Executor executor = JMSFactory.createExecutor(bus, "jms-destination"); + + Executor executor = Executors.newFixedThreadPool(20); + //JMSFactory.createExecutor(bus, "jms-destination"); container.setExecutor(executor); container.start(); return container; http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java index 7adf956..4398dd8 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java @@ -19,13 +19,10 @@ package org.apache.cxf.transport.jms.util; -import javax.jms.Connection; public interface JMSListenerContainer { boolean isRunning(); void stop(); void start(); void shutdown(); - - Connection getConnection(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java index bd9a2b0..dfc641d 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java @@ -20,6 +20,8 @@ package org.apache.cxf.transport.jms.util; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; @@ -29,11 +31,20 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; +import javax.jms.XASession; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; + +import org.apache.cxf.common.logging.LogUtils; public class MessageListenerContainer implements JMSListenerContainer { - + private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); + private Connection connection; - private Destination replyTo; + private Destination destination; private MessageListener listenerHandler; private boolean transacted; private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; @@ -44,19 +55,19 @@ public class MessageListenerContainer implements JMSListenerContainer { private Executor executor; private String durableSubscriptionName; private boolean pubSubNoLocal; + private TransactionManager transactionManager; - public MessageListenerContainer(Connection connection, - Destination replyTo, + public MessageListenerContainer(Connection connection, Destination destination, MessageListener listenerHandler) { this.connection = connection; - this.replyTo = replyTo; + this.destination = destination; this.listenerHandler = listenerHandler; } - + public Connection getConnection() { return connection; } - + public void setTransacted(boolean transacted) { this.transacted = transacted; } @@ -68,7 +79,7 @@ public class MessageListenerContainer implements JMSListenerContainer { public void setMessageSelector(String messageSelector) { this.messageSelector = messageSelector; } - + private Executor getExecutor() { if (executor == null) { executor = Executors.newFixedThreadPool(10); @@ -92,18 +103,27 @@ public class MessageListenerContainer implements JMSListenerContainer { public boolean isRunning() { return running; } - + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + @Override public void start() { try { session = connection.createSession(transacted, acknowledgeMode); if (durableSubscriptionName != null) { - consumer = session.createDurableSubscriber((Topic)replyTo, durableSubscriptionName, + consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, messageSelector, pubSubNoLocal); } else { - consumer = session.createConsumer(replyTo, messageSelector); + consumer = session.createConsumer(destination, messageSelector); } - consumer.setMessageListener(listenerHandler); + + MessageListener intListener = (transactionManager != null) + ? new TransactionalMessageListener(transactionManager, session, listenerHandler) + : new DispachingListener(getExecutor(), listenerHandler); + consumer.setMessageListener(intListener); + running = true; } catch (JMSException e) { throw JMSUtil.convertJmsException(e); @@ -125,22 +145,81 @@ public class MessageListenerContainer implements JMSListenerContainer { ResourceCloser.close(connection); } - class DispachingListener implements MessageListener { + protected TransactionManager getTransactionManager() { + if (this.transactionManager == null) { + try { + InitialContext ctx = new InitialContext(); + this.transactionManager = (TransactionManager)ctx + .lookup("javax.transaction.TransactionManager"); + } catch (NamingException e) { + // Ignore + } + } + return this.transactionManager; + } + + static class DispachingListener implements MessageListener { + private Executor executor; + private MessageListener listenerHandler; + + public DispachingListener(Executor executor, MessageListener listenerHandler) { + this.executor = executor; + this.listenerHandler = listenerHandler; + + } @Override public void onMessage(final Message message) { - getExecutor().execute(new Runnable() { - + executor.execute(new Runnable() { + @Override public void run() { - try { - listenerHandler.onMessage(message); - } catch (Exception e) { - // Ignore - } + listenerHandler.onMessage(message); } + }); } + + } + + static class TransactionalMessageListener implements MessageListener { + private TransactionManager tm; + private MessageListener listenerHandler; + private Session session; + + public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { + this.tm = tm; + this.session = session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + if (tm == null || !(session instanceof XASession)) { + listenerHandler.onMessage(message); + return; + } + try { + XASession xaSession = (XASession)session; // TODO check cast + tm.begin(); + Transaction tr = tm.getTransaction(); + XAResource res = xaSession.getXAResource(); + tr.enlistResource(res); + listenerHandler.onMessage(message); + tm.commit(); + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back"); + try { + tm.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e); + } + } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java new file mode 100644 index 0000000..a800e1c --- /dev/null +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms.util; + +import java.util.Enumeration; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnectionFactory; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAException; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.aries.transaction.internal.AriesTransactionManagerImpl; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class MessageListenerTest { + + private static final String OK = "ok"; + + @Test + @Ignore + public void testWithJTA() throws JMSException, XAException, InterruptedException { + Connection connection = createConnection(); + Queue dest = createQueue(connection, "test"); + + MessageListener listenerHandler = new TestMessageListener(); + MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + container.setTransacted(false); + TransactionManager transactionManager = new AriesTransactionManagerImpl(); + container.setTransactionManager(transactionManager); + container.start(); + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0); + synchronized (listenerHandler) { + sendMessage(connection, dest, OK); + listenerHandler.wait(); + } + Thread.sleep(500); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0); + synchronized (listenerHandler) { + sendMessage(connection, dest, "Fail"); + listenerHandler.wait(); + } + Thread.sleep(500); + assertNumMessagesInQueue("First try should do rollback", connection, dest, 1); + Thread.sleep(500); + assertNumMessagesInQueue("Second try should work", connection, dest, 0); + + container.stop(); + connection.close(); + } + + @Test + public void testNoTransaction() throws JMSException, XAException, InterruptedException { + ConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1?broker.persistent=false"); + Connection connection = cf.createConnection(); + connection.start(); + Queue dest = createQueue(connection, "test"); + + MessageListener listenerHandler = new TestMessageListener(); + MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + container.setTransacted(false); + container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + container.start(); + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0); + synchronized (listenerHandler) { + sendMessage(connection, dest, OK); + listenerHandler.wait(); + } + Thread.sleep(500); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0); + synchronized (listenerHandler) { + sendMessage(connection, dest, "Fail"); + listenerHandler.wait(); + } + Thread.sleep(500); + assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection, dest, 0); + container.stop(); + connection.close(); + } + + private Connection createConnection() throws JMSException { + XAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://broker2?broker.persistent=false"); + Connection connection = cf.createXAConnection(); + connection.start(); + return connection; + } + + protected void drainQueue(Connection connection, Queue dest) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + while (consumer.receiveNoWait() != null) { + System.out.println("Consuming old message"); + } + consumer.close(); + session.close(); + assertNumMessagesInQueue("", connection, dest, 0); + } + + private void assertNumMessagesInQueue(String message, + Connection connection, + Queue queue, + int expectedNum) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(queue); + @SuppressWarnings("unchecked") + Enumeration<Message> messages = browser.getEnumeration(); + int actualNum = 0; + while (messages.hasMoreElements()) { + actualNum++; + messages.nextElement(); + } + browser.close(); + session.close(); + Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum); + } + + private void sendMessage(Connection connection, Destination dest, String content) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = session.createProducer(dest); + Message message = session.createTextMessage(content); + prod.send(message); + prod.close(); + session.close(); + } + + private Queue createQueue(Connection connection, String name) throws JMSException { + Session session = null; + try { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createQueue(name); + } finally { + session.close(); + } + } + + private static final class TestMessageListener implements MessageListener { + @Override + public void onMessage(Message message) { + TextMessage textMessage = (TextMessage) message; + try { + String text = textMessage.getText(); + if (MessageListenerTest.OK.equals(text)) { + System.out.println("Simulating Processing successful"); + } else { + if (message.getJMSRedelivered()) { + System.out.println("Simulating processing worked on second try"); + } else { + throw new RuntimeException("Simulating something went wrong. Expecting rollback"); + } + } + } catch (JMSException e) { + // Ignore + } finally { + synchronized (this) { + this.notifyAll(); + } + + } + } + } +}
