Repository: activemq-artemis Updated Branches: refs/heads/master d5ebe071a -> 26c4680c5
ARTEMIS-507 New thread pool for client threads - Added a thread pool executor, that combines cached and fixed size thread pooling. It behaves like a cached thread pool in that it reuses exising threads and removes idle threads after a timeout, limits the maximum number of threads in the pool, but queue additional request instead of rejecting them. - changed existing code to use the new thread pool instead of a fixed-size thread pool in all places that are configured with a client thread pool size. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1591d256 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1591d256 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1591d256 Branch: refs/heads/master Commit: 1591d25692c4632b18a0563442b471b000ce0100 Parents: d5ebe07 Author: Bernd Gutjahr <[email protected]> Authored: Mon Apr 25 15:30:42 2016 +0200 Committer: Martyn Taylor <[email protected]> Committed: Wed Apr 27 11:03:32 2016 +0100 ---------------------------------------------------------------------- .../utils/ActiveMQThreadPoolExecutor.java | 109 +++++++++++++++++++ .../artemis/api/core/client/ActiveMQClient.java | 4 +- .../core/client/impl/ServerLocatorImpl.java | 3 +- .../activemq/artemis/ClientThreadPoolsTest.java | 2 - .../core/remoting/impl/invm/InVMConnector.java | 3 +- 5 files changed, 115 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1591d256/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java new file mode 100755 index 0000000..8471dac --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/* + * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines + * the benefits of a cached executor and a fixed size executor. + * Similar to a cached executor, threads exceeding the core size are only created on demand, + * and will be removed after idling for a specified keep time. + * But in contrast to a standard cached executor, tasks are queued if the + * maximum pool size if reached, instead of rejected. + * + * This is achieved by using a specialized blocking queue, which checks the + * state of the associated executor in the offer method to decide whether to + * queue a task or have the executor create another thread. + * + * Since the thread pool's execute method is reentrant, more than one caller + * could try to offer a task into the queue. There is a small chance that + * (a few) more threads are created as it should be limited by max pool size. + * To allow for such a case not to reject a task, the underlying thread pool + * executor is not limited. Only the offer method checks the configured limit. + */ +public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { + @SuppressWarnings("serial") + private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> { + private ActiveMQThreadPoolExecutor executor = null; + + public void setExecutor(ActiveMQThreadPoolExecutor executor) { + this.executor = executor; + } + + @Override + public boolean offer(Runnable runnable) { + int poolSize = executor.getPoolSize(); + + // If the are less threads than the configured maximum, then the tasks is + // only queued if there are some idle threads that can run that tasks. + // We have to add the queue size, since some tasks might just have been queued + // but not yet taken by an idle thread. + if (poolSize < executor.getMaximumPoolSize() && (size() + executor.getActive()) >= poolSize) + return false; + + return super.offer(runnable); + } + } + + private int maxPoolSize; + + // count the active threads with before-/afterExecute, since the .getActiveCount is not very + // efficient. + private final AtomicInteger active = new AtomicInteger(0); + + public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { + this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory); + } + + // private constructor is needed to inject 'this' into the ThreadPoolQueue instance + private ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadPoolQueue myQueue, ThreadFactory factory) { + super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory); + maxPoolSize = maxSize; + myQueue.setExecutor(this); + } + + private int getActive() { + return active.get(); + } + + @Override + public int getMaximumPoolSize() { + return maxPoolSize; + } + + @Override + public void setMaximumPoolSize(int maxSize) { + maxPoolSize = maxSize; + } + + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + super.beforeExecute(thread, runnable); + active.incrementAndGet(); + } + + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + active.decrementAndGet(); + super.afterExecute(runnable, throwable); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1591d256/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index 3f5dcb9..1885d28 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -20,7 +20,6 @@ import java.net.URI; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; @@ -38,6 +37,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.uri.ServerLocatorParser; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; /** * Utility class for creating ActiveMQ Artemis {@link ClientSessionFactory} objects. @@ -222,7 +222,7 @@ public final class ActiveMQClient { globalThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); } else { - globalThreadPool = new ThreadPoolExecutor(ActiveMQClient.globalThreadPoolSize, ActiveMQClient.globalThreadPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory); + globalThreadPool = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.globalThreadPoolSize, 60L, TimeUnit.SECONDS, factory); } } return globalThreadPool; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1591d256/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 53ba9df..d9f641a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactor import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.uri.ServerLocatorParser; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores; @@ -236,7 +237,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool = Executors.newCachedThreadPool(factory); } else { - threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory); + threadPool = new ActiveMQThreadPoolExecutor(0, threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory); } factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1591d256/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java index f9cd852..1f99eef 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/ClientThreadPoolsTest.java @@ -221,8 +221,6 @@ public class ClientThreadPoolsTest { ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator); - // TODO: We need to figure out what to do with getCorePoolSize - assertEquals(expectedMax, threadPool.getCorePoolSize()); assertEquals(expectedMax, threadPool.getMaximumPoolSize()); assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1591d256/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index d39b8c6..8bcf96c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleLi import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; @@ -107,7 +108,7 @@ public class InVMConnector extends AbstractConnector { threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory()); } else { - threadPoolExecutor = Executors.newFixedThreadPool(ActiveMQClient.getGlobalThreadPoolSize()); + threadPoolExecutor = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.getGlobalThreadPoolSize(), 60L, TimeUnit.SECONDS, Executors.defaultThreadFactory()); } } return threadPoolExecutor;
