Repository: cxf Updated Branches: refs/heads/master c51282ea7 -> 12e8613a9
CXF-5543 Fix problem with workqueue rejecting messages. Some simplification in MessageListenerContainer Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/12e8613a Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/12e8613a Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/12e8613a Branch: refs/heads/master Commit: 12e8613a90c00b8d34cf624003e7549423639e2b Parents: c51282e Author: Christian Schneider <[email protected]> Authored: Thu Apr 3 10:20:26 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Thu Apr 3 10:20:26 2014 +0200 ---------------------------------------------------------------------- .../cxf/transport/jms/JMSDestination.java | 5 +--- .../apache/cxf/transport/jms/JMSFactory.java | 27 +++++++++++++++----- .../jms/util/MessageListenerContainer.java | 22 ++++++++-------- 3 files changed, 31 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 7c529ef..cda7c5e 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms; import java.io.UnsupportedEncodingException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -106,9 +105,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess Destination destination = jmsConfig.getTargetDestination(session); MessageListenerContainer container = new MessageListenerContainer(connection, destination, this); container.setMessageSelector(jmsConfig.getMessageSelector()); - - Executor executor = Executors.newFixedThreadPool(20); - //JMSFactory.createExecutor(bus, "jms-destination"); + Executor executor = JMSFactory.createExecutor(bus, "jms-destination"); container.setExecutor(executor); container.start(); return container; http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java index 6ad7b55..f3ae29e 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java @@ -29,6 +29,8 @@ import javax.naming.NamingException; import org.apache.cxf.Bus; import org.apache.cxf.transport.jms.util.JMSSender; import org.apache.cxf.transport.jms.util.JndiHelper; +import org.apache.cxf.workqueue.AutomaticWorkQueue; +import org.apache.cxf.workqueue.WorkQueue; import org.apache.cxf.workqueue.WorkQueueManager; /** @@ -103,17 +105,28 @@ public final class JMSFactory { return connection; } + /** + * Get workqueue from workqueue manager. Return an executor that will never reject messages and + * instead block when all threads are used. + * + * @param bus + * @param name + * @return + */ public static Executor createExecutor(Bus bus, String name) { WorkQueueManager manager = bus.getExtension(WorkQueueManager.class); - Executor workQueue; if (manager != null) { - workQueue = manager.getNamedWorkQueue(name); - if (workQueue == null) { - workQueue = manager.getAutomaticWorkQueue(); - } + AutomaticWorkQueue workQueue1 = manager.getNamedWorkQueue(name); + final WorkQueue workQueue = (workQueue1 == null) ? manager.getAutomaticWorkQueue() : workQueue1; + return new Executor() { + + @Override + public void execute(Runnable command) { + workQueue.execute(command, 0); + } + }; } else { - workQueue = Executors.newFixedThreadPool(20); + return Executors.newFixedThreadPool(20); } - return workQueue; } } http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java index dfc641d..fd49d4a 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java @@ -34,9 +34,7 @@ import javax.jms.Topic; import javax.jms.XASession; import javax.naming.InitialContext; import javax.naming.NamingException; -import javax.transaction.Transaction; import javax.transaction.TransactionManager; -import javax.transaction.xa.XAResource; import org.apache.cxf.common.logging.LogUtils; @@ -182,29 +180,29 @@ public class MessageListenerContainer implements JMSListenerContainer { } + @SuppressWarnings("PMD") static class TransactionalMessageListener implements MessageListener { private TransactionManager tm; private MessageListener listenerHandler; - private Session session; + private XASession session; public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { + if (tm == null) { + throw new IllegalArgumentException("Must supply a transaction manager"); + } + if (session == null || !(session instanceof XASession)) { + throw new IllegalArgumentException("Must supply an XASession"); + } this.tm = tm; - this.session = session; + this.session = (XASession)session; this.listenerHandler = listenerHandler; } @Override public void onMessage(Message message) { - if (tm == null || !(session instanceof XASession)) { - listenerHandler.onMessage(message); - return; - } try { - XASession xaSession = (XASession)session; // TODO check cast tm.begin(); - Transaction tr = tm.getTransaction(); - XAResource res = xaSession.getXAResource(); - tr.enlistResource(res); + tm.getTransaction().enlistResource(session.getXAResource()); listenerHandler.onMessage(message); tm.commit(); } catch (Throwable e) {
