Repository: activemq Updated Branches: refs/heads/master 45f60e413 -> 08695ab30
https://issues.apache.org/jira/browse/AMQ-6184 - add workQueueCapacity config property default to 0 where a value > 0 swaps out the dsynchQ for a capicity limited blocking queue. This allows the core pool to grow on demand as before but also allows work to be queued when necessary Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/08695ab3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/08695ab3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/08695ab3 Branch: refs/heads/master Commit: 08695ab30306307aced0d2d51b1755bc6e29081a Parents: 45f60e4 Author: gtully <[email protected]> Authored: Wed Sep 28 09:51:05 2016 +0100 Committer: gtully <[email protected]> Committed: Wed Sep 28 10:04:22 2016 +0100 ---------------------------------------------------------------------- .../activemq/transport/nio/SelectorManager.java | 23 +- .../transport/nio/NIOAsyncSendWithPFCTest.java | 272 +++++++++++++++++++ 2 files changed, 291 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/08695ab3/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java index 54e1cc0..b6b1f50 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java @@ -19,9 +19,11 @@ package org.apache.activemq.transport.nio; import java.io.IOException; import java.nio.channels.spi.AbstractSelectableChannel; import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -40,10 +42,10 @@ public final class SelectorManager { private Executor selectorExecutor = createDefaultExecutor(); private Executor channelExecutor = selectorExecutor; private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); - private int maxChannelsPerWorker = 1024; + private int maxChannelsPerWorker = -1; protected ExecutorService createDefaultExecutor() { - ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, newWorkQueue(), new ThreadFactory() { private long i = 0; @@ -59,8 +61,17 @@ public final class SelectorManager { return rc; } + private BlockingQueue<Runnable> newWorkQueue() { + final int workQueueCapicity = getDefaultWorkQueueCapacity(); + return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity) : new SynchronousQueue<Runnable>(); + } + + private static int getDefaultWorkQueueCapacity() { + return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0); + } + private static int getDefaultCorePoolSize() { - return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10); + return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10); } private static int getDefaultMaximumPoolSize() { @@ -71,6 +82,10 @@ public final class SelectorManager { return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30); } + private static int getDefaultMaxChannelsPerWorker() { + return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", 1024); + } + public static SelectorManager getInstance() { return SINGLETON; } @@ -124,7 +139,7 @@ public final class SelectorManager { } public int getMaxChannelsPerWorker() { - return maxChannelsPerWorker; + return maxChannelsPerWorker >= 0 ? maxChannelsPerWorker : getDefaultMaxChannelsPerWorker(); } public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { http://git-wip-us.apache.org/repos/asf/activemq/blob/08695ab3/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java new file mode 100644 index 0000000..0b7b7c3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationView; +import org.apache.activemq.broker.jmx.QueueView; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/* + demonstrates that with nio it does not make sense to block on the broker but thread pool + shold grow past initial corepoolsize of 10 + */ +public class NIOAsyncSendWithPFCTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(NIOAsyncSendWithPFCTest.class); + + private static String TRANSPORT_URL = "nio://0.0.0.0:0"; + private static final String DESTINATION_ONE = "testQ1"; + private static final String DESTINATION_TWO = "testQ2"; + private static final int MESSAGES_TO_SEND = 100; + private static int NUMBER_OF_PRODUCERS = 10; + + protected BrokerService createBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + + PolicyMap policyMap = new PolicyMap(); + List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); + PolicyEntry pe = new PolicyEntry(); + + + pe.setMemoryLimit(256000); + pe.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + + broker.addConnector(TRANSPORT_URL); + broker.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(DESTINATION_ONE)}); + + broker.start(); + TRANSPORT_URL = broker.getTransportConnectorByScheme("nio").getPublishableConnectString(); + return broker; + } + + /** + * Test creates 10 producer who send to a single destination using Async mode. + * Producer flow control kicks in for that destination. When producer flow control is blocking sends + * Test tries to create another JMS connection to the nio. + */ + public void testAsyncSendPFCNewConnection() throws Exception { + + + BrokerService broker = createBroker(); + broker.waitUntilStarted(); + + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS); + QueueView queueView = getQueueView(broker, DESTINATION_ONE); + + try { + + for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) { + + executorService.submit(new ProducerTask()); + + } + + //wait till producer follow control kicks in + waitForProducerFlowControl(broker, queueView); + + + try { + sendMessagesAsync(1, DESTINATION_TWO); + } catch (Exception ex) { + LOG.error("Ex on send new connection", ex); + fail("*** received the following exception when creating addition producer new connection:" + ex); + } + + + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + + + } + + + public void testAsyncSendPFCExistingConnection() throws Exception { + + + BrokerService broker = createBroker(); + broker.waitUntilStarted(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL + "?wireFormat.maxInactivityDuration=5000"); + ActiveMQConnection exisitngConnection = (ActiveMQConnection) connectionFactory.createConnection(); + + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS); + QueueView queueView = getQueueView(broker, DESTINATION_ONE); + + try { + + for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) { + + executorService.submit(new ProducerTask()); + + } + + + //wait till producer follow control kicks in + waitForProducerFlowControl(broker, queueView); + + + TestSupport.dumpAllThreads("Blocked"); + + try { + Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (Exception ex) { + LOG.error("Ex on create session", ex); + fail("*** received the following exception when creating producer session:" + ex); + } + + + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + + + + } + + private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception { + + + boolean blockingAllSends; + do { + blockingAllSends = queueView.getBlockedSends() > 10; + Thread.sleep(1000); + + } while (!blockingAllSends); + } + + class ProducerTask implements Runnable { + + + @Override + public void run() { + try { + //send X messages + sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + + private Long sendMessagesAsync(int messageCount, String destination) throws Exception { + + long numberOfMessageSent = 0; + + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL); + + + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setUseAsyncSend(true); + connection.start(); + + try { + + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer jmsProducer = producerSession.createProducer(producerSession.createQueue(destination)); + + Message sendMessage = createTextMessage(producerSession); + + for (int i = 0; i < messageCount; i++) { + + jmsProducer.send(sendMessage); + numberOfMessageSent++; + + } + + LOG.info(" Finished after producing : " + numberOfMessageSent); + return numberOfMessageSent; + + } catch (Exception ex) { + LOG.info("Exception received producing ", ex); + LOG.info("finishing after exception :" + numberOfMessageSent); + return numberOfMessageSent; + } finally { + if (connection != null) { + connection.close(); + } + } + + } + + private TextMessage createTextMessage(Session session) throws JMSException { + StringBuffer buffer = new StringBuffer(); + + for (int i = 0; i < 1000; i++) { + + buffer.append("1234567890"); + } + + return session.createTextMessage(buffer.toString()); + + } + + + private QueueView getQueueView(BrokerService broker, String queueName) throws Exception { + Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews(); + + for (ObjectName key : queueViews.keySet()) { + DestinationView destinationView = queueViews.get(key); + + if (destinationView instanceof QueueView) { + QueueView queueView = (QueueView) destinationView; + + if (queueView.getName().equals(queueName)) { + return queueView; + } + + } + } + return null; + } + +} +
