CXF-5543 Fixing tests
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/7c7fff78 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/7c7fff78 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/7c7fff78 Branch: refs/heads/master Commit: 7c7fff780a00adffbc1c67be4457932d8028c62f Parents: 5c2c2c7 Author: Christian Schneider <[email protected]> Authored: Fri Apr 4 09:15:16 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Fri Apr 4 09:15:16 2014 +0200 ---------------------------------------------------------------------- .../jms/util/MessageListenerContainer.java | 255 +++++++++++++++++++ .../transport/jms/util/MessageListenerTest.java | 32 ++- 2 files changed, 275 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/7c7fff78/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java new file mode 100644 index 0000000..6521289 --- /dev/null +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms.util; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.XASession; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.transaction.TransactionManager; + +import org.apache.cxf.common.logging.LogUtils; + +public class MessageListenerContainer implements JMSListenerContainer { + private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); + + private Connection connection; + private Destination destination; + private MessageListener listenerHandler; + private boolean transacted; + private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private String messageSelector; + private boolean running; + private MessageConsumer consumer; + private Session session; + private Executor executor; + private String durableSubscriptionName; + private boolean pubSubNoLocal; + private TransactionManager transactionManager; + + public MessageListenerContainer(Connection connection, Destination destination, + MessageListener listenerHandler) { + this.connection = connection; + this.destination = destination; + this.listenerHandler = listenerHandler; + } + + public Connection getConnection() { + return connection; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public void setAcknowledgeMode(int acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + protected Executor getExecutor() { + if (executor == null) { + executor = Executors.newFixedThreadPool(10); + } + return executor; + } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + public void setPubSubNoLocal(boolean pubSubNoLocal) { + this.pubSubNoLocal = pubSubNoLocal; + } + + @Override + public boolean isRunning() { + return running; + } + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + @Override + public void start() { + try { + session = connection.createSession(transacted, acknowledgeMode); + if (durableSubscriptionName != null && destination instanceof Topic) { + consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, + messageSelector, pubSubNoLocal); + } else { + consumer = session.createConsumer(destination, messageSelector); + } + + MessageListener intListener = (transactionManager != null) + ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) + : new LocalTransactionalMessageListener(session, listenerHandler); + // new DispachingListener(getExecutor(), listenerHandler); + consumer.setMessageListener(intListener); + + running = true; + } catch (JMSException e) { + throw JMSUtil.convertJmsException(e); + } + } + + @Override + public void stop() { + running = false; + ResourceCloser.close(consumer); + ResourceCloser.close(session); + consumer = null; + session = null; + } + + @Override + public void shutdown() { + stop(); + ResourceCloser.close(connection); + } + + protected TransactionManager getTransactionManager() { + if (this.transactionManager == null) { + try { + InitialContext ctx = new InitialContext(); + this.transactionManager = (TransactionManager)ctx + .lookup("javax.transaction.TransactionManager"); + } catch (NamingException e) { + // Ignore + } + } + return this.transactionManager; + } + + static class DispachingListener implements MessageListener { + private Executor executor; + private MessageListener listenerHandler; + + public DispachingListener(Executor executor, MessageListener listenerHandler) { + this.executor = executor; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(final Message message) { + executor.execute(new Runnable() { + + @Override + public void run() { + listenerHandler.onMessage(message); + } + + }); + } + + } + + static class LocalTransactionalMessageListener implements MessageListener { + private MessageListener listenerHandler; + private Session session; + + public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler) { + this.session = session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + try { + listenerHandler.onMessage(message); + if (session.getTransacted()) { + session.commit(); + } + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); + try { + session.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of Local transaction failed", e); + } + } + + } + + @SuppressWarnings("PMD") + static class XATransactionalMessageListener implements MessageListener { + private TransactionManager tm; + private MessageListener listenerHandler; + private XASession session; + + public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { + if (tm == null) { + throw new IllegalArgumentException("Must supply a transaction manager"); + } + if (session == null || !(session instanceof XASession)) { + throw new IllegalArgumentException("Must supply an XASession"); + } + this.tm = tm; + this.session = (XASession)session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + try { + tm.begin(); + tm.getTransaction().enlistResource(session.getXAResource()); + listenerHandler.onMessage(message); + tm.commit(); + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); + try { + tm.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/7c7fff78/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index fec9536..c1bf86a 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -37,6 +37,7 @@ import javax.transaction.xa.XAException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.pool.XaPooledConnectionFactory; import org.apache.aries.transaction.internal.AriesTransactionManagerImpl; import org.junit.Assert; import org.junit.Test; @@ -49,14 +50,16 @@ public class MessageListenerTest { @Test public void testWithJTA() throws JMSException, XAException, InterruptedException { - Connection connection = createXAConnection("brokerJTA"); + TransactionManager transactionManager = new AriesTransactionManagerImpl(); + Connection connection = createXAConnection("brokerJTA", transactionManager); Queue dest = createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); - MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest, + listenerHandler); container.setTransacted(false); container.setAcknowledgeMode(Session.SESSION_TRANSACTED); - TransactionManager transactionManager = new AriesTransactionManagerImpl(); + container.setTransactionManager(transactionManager); container.start(); @@ -72,7 +75,8 @@ public class MessageListenerTest { Queue dest = createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); - MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest, + listenerHandler); container.setTransacted(false); container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); container.start(); @@ -107,17 +111,18 @@ public class MessageListenerTest { private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException, InterruptedException { + Queue dlq = createQueue(connection, "ActiveMQ.DLQ"); assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0); + assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0); sendMessage(connection, dest, OK); assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000); sendMessage(connection, dest, FAILFIRST); - assertNumMessagesInQueue("Should be rolled back on first try", connection, dest, 1, 800); assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000); - sendMessage(connection, dest, "Fail"); - assertNumMessagesInQueue("Should be rolled back", connection, dest, 1, 1000); + sendMessage(connection, dest, FAIL); + assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 1000); } private Connection createConnection(String name) throws JMSException { @@ -129,11 +134,14 @@ public class MessageListenerTest { return connection; } - private Connection createXAConnection(String name) throws JMSException { + private Connection createXAConnection(String name, TransactionManager tm) throws JMSException { ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://" + name + "?broker.persistent=false"); cf.setRedeliveryPolicy(redeliveryPolicy()); - Connection connection = cf.createXAConnection(); + XaPooledConnectionFactory cfp = new XaPooledConnectionFactory(cf); + cfp.setTransactionManager(tm); + cfp.setConnectionFactory(cf); + Connection connection = cfp.createConnection(); connection.start(); return connection; } @@ -141,8 +149,7 @@ public class MessageListenerTest { private RedeliveryPolicy redeliveryPolicy() { RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setRedeliveryDelay(1000); - redeliveryPolicy.setMaximumRedeliveries(3); - redeliveryPolicy.setUseExponentialBackOff(false); + redeliveryPolicy.setMaximumRedeliveries(1); return redeliveryPolicy; } @@ -164,7 +171,8 @@ public class MessageListenerTest { int actualNum; do { actualNum = getNumMessages(connection, queue); - System.out.println("Messages in queue: " + actualNum + ", expecting: " + expectedNum); + System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum + + ", expecting: " + expectedNum); Thread.sleep(100); } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum); Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
