Repository: cxf Updated Branches: refs/heads/master aaabd57fb -> 0336a2399
[CXF-6702]: Support external executor for JMS Containers Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0336a239 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0336a239 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0336a239 Branch: refs/heads/master Commit: 0336a2399b1980e04372c651368f8718f75a4f14 Parents: aaabd57 Author: Andrei Shakirin <[email protected]> Authored: Sun Dec 6 16:26:34 2015 +0100 Committer: Andrei Shakirin <[email protected]> Committed: Sun Dec 6 16:26:34 2015 +0100 ---------------------------------------------------------------------- .../apache/cxf/transport/jms/JMSConduit.java | 6 ++-- .../cxf/transport/jms/JMSDestination.java | 6 ++-- .../apache/cxf/transport/jms/JMSFactory.java | 5 ++- .../util/AbstractMessageListenerContainer.java | 37 ++++++++++++++++++-- .../util/PollingMessageListenerContainer.java | 24 ++----------- 5 files changed, 50 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 1a8afc9..677dcf6 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -178,8 +178,10 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me staticReplyDestination, this); container.setMessageSelector(messageSelector); - Executor executor = JMSFactory.createExecutor(bus, "jms-conduit"); - container.setExecutor(executor); + Object executor = bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR); + if (executor instanceof Executor) { + container.setExecutor((Executor) executor); + } container.start(); jmsListener = container; addBusListener(); http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 9f0fcbc..113a7d2 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -140,8 +140,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess container.setTransacted(jmsConfig.isSessionTransacted()); container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); - Executor executor = JMSFactory.createExecutor(bus, "jms-destination"); - container.setExecutor(executor); + Object executor = bus.getProperty(JMSFactory.JMS_DESTINATION_EXECUTOR); + if (executor instanceof Executor) { + container.setExecutor((Executor) executor); + } container.start(); suspendedContinuations.setListenerContainer(container); connection.start(); http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java index 4eaf083..5d617c8 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java @@ -37,6 +37,9 @@ import org.apache.cxf.workqueue.WorkQueueManager; * Factory to create jms helper objects from configuration and context information */ public final class JMSFactory { + public static final String JMS_DESTINATION_EXECUTOR = "org.apache.cxf.extensions.jms.destination.executor"; + public static final String JMS_CONDUIT_EXECUTOR = "org.apache.cxf.extensions.jms.conduit.executor"; + static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory"; static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod"; @@ -116,7 +119,7 @@ public final class JMSFactory { * @param name * @return */ - public static Executor createExecutor(Bus bus, String name) { + public static Executor createWorkQueueExecutor(Bus bus, String name) { WorkQueueManager manager = bus.getExtension(WorkQueueManager.class); if (manager != null) { AutomaticWorkQueue workQueue1 = manager.getNamedWorkQueue(name); http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java index f5affe4..65d6c4c 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java @@ -19,7 +19,9 @@ package org.apache.cxf.transport.jms.util; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import javax.jms.Connection; @@ -40,11 +42,14 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; protected String messageSelector; protected boolean running; - protected Executor executor; protected String durableSubscriptionName; protected boolean pubSubNoLocal; protected TransactionManager transactionManager; + private Executor executor; + private int concurrentConsumers = 1; + private boolean internalExecutor; + public AbstractMessageListenerContainer() { super(); } @@ -70,7 +75,8 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon protected Executor getExecutor() { if (executor == null) { - executor = Executors.newFixedThreadPool(10); + executor = Executors.newFixedThreadPool(concurrentConsumers); + internalExecutor = true; } return executor; } @@ -79,6 +85,25 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon this.executor = executor; } + @Override + public void stop() { + // In case of using external executor, don't shutdown it + if ((executor == null) || !internalExecutor) { + return; + } + + ExecutorService executorService = (ExecutorService)executor; + executorService.shutdown(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore + } + executorService.shutdownNow(); + executor = null; + internalExecutor = false; + } + public void setDurableSubscriptionName(String durableSubscriptionName) { this.durableSubscriptionName = durableSubscriptionName; } @@ -96,4 +121,12 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon this.transactionManager = transactionManager; } + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public int getConcurrentConsumers() { + return concurrentConsumers; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/0336a239/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index f79ab09..0acd40f 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -18,9 +18,6 @@ */ package org.apache.cxf.transport.jms.util; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,10 +37,6 @@ import org.apache.cxf.common.logging.LogUtils; public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); - private ExecutorService pollers; - - private int concurrentConsumers = 1; - public PollingMessageListenerContainer(Connection connection, Destination destination, MessageListener listenerHandler) { this.connection = connection; @@ -164,10 +157,9 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont return; } running = true; - pollers = Executors.newFixedThreadPool(concurrentConsumers); - for (int c = 0; c < concurrentConsumers; c++) { + for (int c = 0; c < getConcurrentConsumers(); c++) { Runnable poller = (transactionManager != null) ? new XAPoller() : new Poller(); - pollers.execute(poller); + getExecutor().execute(poller); } } @@ -178,14 +170,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont return; } running = false; - pollers.shutdown(); - try { - pollers.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Ignore - } - pollers.shutdownNow(); - pollers = null; + super.stop(); } @Override @@ -193,7 +178,4 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont stop(); } - public void setConcurrentConsumers(int concurrentConsumers) { - this.concurrentConsumers = concurrentConsumers; - } }
