Repository: cxf Updated Branches: refs/heads/master 12e8613a9 -> c62ac164b
CXF-5543 Made tests more solid Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c62ac164 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c62ac164 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c62ac164 Branch: refs/heads/master Commit: c62ac164b892b2a7d075eb936ceee9dc935f4b7f Parents: 12e8613 Author: Christian Schneider <[email protected]> Authored: Thu Apr 3 11:23:09 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Thu Apr 3 11:23:09 2014 +0200 ---------------------------------------------------------------------- .../jms/util/MessageListenerContainer.java | 44 ++++- .../transport/jms/util/MessageListenerTest.java | 160 ++++++++++++------- 2 files changed, 138 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/c62ac164/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java index fd49d4a..01d9ae7 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java @@ -78,7 +78,7 @@ public class MessageListenerContainer implements JMSListenerContainer { this.messageSelector = messageSelector; } - private Executor getExecutor() { + protected Executor getExecutor() { if (executor == null) { executor = Executors.newFixedThreadPool(10); } @@ -118,8 +118,9 @@ public class MessageListenerContainer implements JMSListenerContainer { } MessageListener intListener = (transactionManager != null) - ? new TransactionalMessageListener(transactionManager, session, listenerHandler) - : new DispachingListener(getExecutor(), listenerHandler); + ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) + : new LocalTransactionalMessageListener(session, listenerHandler); + // new DispachingListener(getExecutor(), listenerHandler); consumer.setMessageListener(intListener); running = true; @@ -163,7 +164,6 @@ public class MessageListenerContainer implements JMSListenerContainer { public DispachingListener(Executor executor, MessageListener listenerHandler) { this.executor = executor; this.listenerHandler = listenerHandler; - } @Override @@ -180,13 +180,43 @@ public class MessageListenerContainer implements JMSListenerContainer { } + static class LocalTransactionalMessageListener implements MessageListener { + private MessageListener listenerHandler; + private Session session; + + public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler) { + this.session = session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + try { + listenerHandler.onMessage(message); + session.commit(); + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); + try { + session.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of Local transaction failed", e); + } + } + + } + @SuppressWarnings("PMD") - static class TransactionalMessageListener implements MessageListener { + static class XATransactionalMessageListener implements MessageListener { private TransactionManager tm; private MessageListener listenerHandler; private XASession session; - public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { + public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { if (tm == null) { throw new IllegalArgumentException("Must supply a transaction manager"); } @@ -211,7 +241,7 @@ public class MessageListenerContainer implements JMSListenerContainer { } private void safeRollback(Throwable t) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back"); + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); try { tm.rollback(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cxf/blob/c62ac164/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index a800e1c..fec9536 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms.util; import java.util.Enumeration; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -32,90 +31,122 @@ import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.XAConnectionFactory; import javax.transaction.TransactionManager; import javax.transaction.xa.XAException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; import org.apache.aries.transaction.internal.AriesTransactionManagerImpl; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; public class MessageListenerTest { + private static final String FAIL = "fail"; + private static final String FAILFIRST = "failfirst"; private static final String OK = "ok"; @Test - @Ignore public void testWithJTA() throws JMSException, XAException, InterruptedException { - Connection connection = createConnection(); + Connection connection = createXAConnection("brokerJTA"); Queue dest = createQueue(connection, "test"); - + MessageListener listenerHandler = new TestMessageListener(); MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); container.setTransacted(false); + container.setAcknowledgeMode(Session.SESSION_TRANSACTED); TransactionManager transactionManager = new AriesTransactionManagerImpl(); container.setTransactionManager(transactionManager); container.start(); - assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0); - synchronized (listenerHandler) { - sendMessage(connection, dest, OK); - listenerHandler.wait(); - } - Thread.sleep(500); - assertNumMessagesInQueue("This message should be committed", connection, dest, 0); - synchronized (listenerHandler) { - sendMessage(connection, dest, "Fail"); - listenerHandler.wait(); - } - Thread.sleep(500); - assertNumMessagesInQueue("First try should do rollback", connection, dest, 1); - Thread.sleep(500); - assertNumMessagesInQueue("Second try should work", connection, dest, 0); - + + testTransactionalBehaviour(connection, dest); + container.stop(); connection.close(); } - + @Test public void testNoTransaction() throws JMSException, XAException, InterruptedException { - ConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1?broker.persistent=false"); - Connection connection = cf.createConnection(); - connection.start(); + Connection connection = createConnection("brokerNoTransaction"); Queue dest = createQueue(connection, "test"); - + MessageListener listenerHandler = new TestMessageListener(); MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); container.setTransacted(false); container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); container.start(); - assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0); - synchronized (listenerHandler) { - sendMessage(connection, dest, OK); - listenerHandler.wait(); - } - Thread.sleep(500); - assertNumMessagesInQueue("This message should be committed", connection, dest, 0); - synchronized (listenerHandler) { - sendMessage(connection, dest, "Fail"); - listenerHandler.wait(); - } - Thread.sleep(500); - assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection, dest, 0); + + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0); + + sendMessage(connection, dest, OK); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000); + + sendMessage(connection, dest, FAIL); + assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection, + dest, 0, 1000); + + container.stop(); + connection.close(); + } + + @Test + public void testLocalTransaction() throws JMSException, XAException, InterruptedException { + Connection connection = createConnection("brokerLocalTransaction"); + Queue dest = createQueue(connection, "test"); + MessageListener listenerHandler = new TestMessageListener(); + MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + container.setTransacted(true); + container.setAcknowledgeMode(Session.SESSION_TRANSACTED); + container.start(); + + testTransactionalBehaviour(connection, dest); container.stop(); connection.close(); } - private Connection createConnection() throws JMSException { - XAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://broker2?broker.persistent=false"); + private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException, + InterruptedException { + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0); + + sendMessage(connection, dest, OK); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000); + + sendMessage(connection, dest, FAILFIRST); + assertNumMessagesInQueue("Should be rolled back on first try", connection, dest, 1, 800); + assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000); + + sendMessage(connection, dest, "Fail"); + assertNumMessagesInQueue("Should be rolled back", connection, dest, 1, 1000); + } + + private Connection createConnection(String name) throws JMSException { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://" + name + + "?broker.persistent=false"); + cf.setRedeliveryPolicy(redeliveryPolicy()); + Connection connection = cf.createConnection(); + connection.start(); + return connection; + } + + private Connection createXAConnection(String name) throws JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://" + name + + "?broker.persistent=false"); + cf.setRedeliveryPolicy(redeliveryPolicy()); Connection connection = cf.createXAConnection(); connection.start(); return connection; } - protected void drainQueue(Connection connection, Queue dest) throws JMSException { + private RedeliveryPolicy redeliveryPolicy() { + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setRedeliveryDelay(1000); + redeliveryPolicy.setMaximumRedeliveries(3); + redeliveryPolicy.setUseExponentialBackOff(false); + return redeliveryPolicy; + } + + protected void drainQueue(Connection connection, Queue dest) throws JMSException, InterruptedException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(dest); while (consumer.receiveNoWait() != null) { @@ -123,13 +154,23 @@ public class MessageListenerTest { } consumer.close(); session.close(); - assertNumMessagesInQueue("", connection, dest, 0); + assertNumMessagesInQueue("", connection, dest, 0, 0); + } + + private void assertNumMessagesInQueue(String message, Connection connection, Queue queue, + int expectedNum, int timeout) throws JMSException, + InterruptedException { + long startTime = System.currentTimeMillis(); + int actualNum; + do { + actualNum = getNumMessages(connection, queue); + System.out.println("Messages in queue: " + actualNum + ", expecting: " + expectedNum); + Thread.sleep(100); + } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum); + Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum); } - private void assertNumMessagesInQueue(String message, - Connection connection, - Queue queue, - int expectedNum) throws JMSException { + private int getNumMessages(Connection connection, Queue queue) throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); QueueBrowser browser = session.createBrowser(queue); @SuppressWarnings("unchecked") @@ -141,16 +182,18 @@ public class MessageListenerTest { } browser.close(); session.close(); - Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum); + return actualNum; } - private void sendMessage(Connection connection, Destination dest, String content) throws JMSException { + private void sendMessage(Connection connection, Destination dest, String content) throws JMSException, + InterruptedException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod = session.createProducer(dest); Message message = session.createTextMessage(content); prod.send(message); prod.close(); session.close(); + Thread.sleep(500); // Give receiver some time to process } private Queue createQueue(Connection connection, String name) throws JMSException { @@ -162,29 +205,28 @@ public class MessageListenerTest { session.close(); } } - + private static final class TestMessageListener implements MessageListener { @Override public void onMessage(Message message) { - TextMessage textMessage = (TextMessage) message; + TextMessage textMessage = (TextMessage)message; try { String text = textMessage.getText(); - if (MessageListenerTest.OK.equals(text)) { + if (OK.equals(text)) { System.out.println("Simulating Processing successful"); - } else { + } else if (FAIL.equals(text)) { + throw new RuntimeException("Simulating something went wrong. Expecting rollback"); + } else if (FAILFIRST.equals(text)) { if (message.getJMSRedelivered()) { System.out.println("Simulating processing worked on second try"); } else { throw new RuntimeException("Simulating something went wrong. Expecting rollback"); } + } else { + throw new IllegalArgumentException("Invalid message type"); } } catch (JMSException e) { // Ignore - } finally { - synchronized (this) { - this.notifyAll(); - } - } } }
