Repository: deltaspike Updated Branches: refs/heads/fb/DELTASPIKE-1302_ThreadPoolManager-configuration [created] c4006cc3b
DELTASPIKE-1302 configuration for ThreadPoolManager Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/c4006cc3 Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/c4006cc3 Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/c4006cc3 Branch: refs/heads/fb/DELTASPIKE-1302_ThreadPoolManager-configuration Commit: c4006cc3b5fda1eea5551fee5a0a7e15c19264d9 Parents: d1316e5 Author: Romain Manni-Bucau <rmannibu...@apache.org> Authored: Sun Nov 26 15:31:46 2017 +0100 Committer: Romain Manni-Bucau <rmannibu...@apache.org> Committed: Sun Nov 26 15:31:46 2017 +0100 ---------------------------------------------------------------------- .../core/impl/future/ThreadPoolManager.java | 195 ++++++++++++++++--- .../core/impl/future/ThreadPoolManagerTest.java | 116 +++++++++++ documentation/src/main/asciidoc/core.adoc | 26 +++ 3 files changed, 312 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java index 9dc6667..4e1f2a1 100644 --- a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java @@ -18,23 +18,48 @@ */ package org.apache.deltaspike.core.impl.future; +import org.apache.deltaspike.core.api.config.ConfigResolver; import org.apache.deltaspike.core.api.config.base.CoreBaseConfig; import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.spi.CreationalContext; +import javax.enterprise.inject.spi.Bean; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Locale.ENGLISH; + @ApplicationScoped public class ThreadPoolManager { private final ConcurrentMap<String, ExecutorService> pools = new ConcurrentHashMap<String, ExecutorService>(); - private volatile ExecutorService defaultPool; + private final Collection<CreationalContext<?>> contexts = new ArrayList<CreationalContext<?>>(8); private volatile boolean closed = false; + @Inject + private BeanManager beanManager; + @PreDestroy private void shutdown() { @@ -43,21 +68,12 @@ public class ThreadPoolManager for (final ExecutorService es : pools.values()) { es.shutdown(); - try - { - es.awaitTermination(timeout, TimeUnit.MILLISECONDS); - } - catch (final InterruptedException e) - { - Thread.interrupted(); - } } - if (defaultPool != null) + for (final ExecutorService es : pools.values()) { - defaultPool.shutdown(); try { - defaultPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + es.awaitTermination(timeout, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { @@ -65,6 +81,12 @@ public class ThreadPoolManager } } pools.clear(); + + for (final CreationalContext<?> ctx : contexts) + { + ctx.release(); + } + contexts.clear(); } public ExecutorService find(final String name) @@ -76,25 +98,148 @@ public class ThreadPoolManager ExecutorService pool = pools.get(name); if (pool == null) { - ensureDefaultPool(); - pool = defaultPool; - } - return pool; - } - - private void ensureDefaultPool() - { - if (defaultPool == null) - { synchronized (this) { - if (defaultPool == null) + pool = pools.get(name); + if (pool == null) { - defaultPool = Executors.newFixedThreadPool( - Math.max(2, Runtime.getRuntime().availableProcessors())); + // the instantiation does the following: + // 1. check if there is a named bean matching this name using @Default qualifier + // 2. check if there is a JNDI entry (ManagedExecutorService case) matching this name + // 3. create a new executor service based on the DS-config + + // 1. + final Set<Bean<?>> beans = beanManager.getBeans(name); + if (beans != null && !beans.isEmpty()) + { + final Bean<?> bean = beanManager.resolve(beans); + if (bean.getTypes().contains(ExecutorService.class)) + { + final CreationalContext<Object> creationalContext = beanManager.createCreationalContext(null); + if (!beanManager.isNormalScope(bean.getScope())) + { + contexts.add(creationalContext); + } + pool = ExecutorService.class.cast(beanManager.getReference(bean, ExecutorService.class, creationalContext)); + } + } + + if (pool == null) // 2. + { + for (final String prefix : asList( + "", "java:app/", "java:global/", "java:global/threads/", "java:global/deltaspike/", "java:")) + { + try { + final Object instance = new InitialContext().lookup(prefix + name); + if (ExecutorService.class.isInstance(instance)) + { + pool = ExecutorService.class.cast(instance); + break; + } + } catch (final NamingException e) { + // no-op + } + } + } + + if (pool == null) // 3. + { + final String configPrefix = "futurable.pool." + name + "."; + final int coreSize = ConfigResolver.resolve(configPrefix + "coreSize") + .as(Integer.class) + .withDefault(Math.max(2, Runtime.getRuntime().availableProcessors())) + .getValue(); + final int maxSize = ConfigResolver.resolve(configPrefix + "maxSize") + .as(Integer.class) + .withDefault(coreSize) + .getValue(); + final long keepAlive = ConfigResolver.resolve(configPrefix + "keepAlive.value") + .as(Long.class) + .withDefault(0L) + .getValue(); + final String keepAliveUnit = ConfigResolver.resolve(configPrefix + "keepAlive.unit") + .as(String.class) + .withDefault("MILLISECONDS") + .getValue(); + + final String queueType = ConfigResolver.resolve(configPrefix + "queue.type") + .as(String.class) + .withDefault("LINKED") + .getValue(); + final BlockingQueue<Runnable> queue; + if ("ARRAY".equalsIgnoreCase(queueType)) + { + final int size = ConfigResolver.resolve(configPrefix + "queue.size") + .as(Integer.class) + .withDefault(1024) + .getValue(); + final boolean fair = ConfigResolver.resolve(configPrefix + "queue.fair") + .as(Boolean.class) + .withDefault(false) + .getValue(); + queue = new ArrayBlockingQueue<Runnable>(size, fair); + } + else if ("SYNCHRONOUS".equalsIgnoreCase(queueType)) + { + final boolean fair = ConfigResolver.resolve(configPrefix + "queue.fair") + .as(Boolean.class) + .withDefault(false) + .getValue(); + queue = new SynchronousQueue<Runnable>(fair); + } + else/* (queueType.equalsIgnoreCase("LINKED")) */ + { + final int capacity = ConfigResolver.resolve(configPrefix + "queue.capacity") + .as(Integer.class) + .withDefault(Integer.MAX_VALUE) + .getValue(); + queue = new LinkedBlockingQueue<Runnable>(capacity); + } + + final String threadFactoryName = ConfigResolver.getPropertyValue(configPrefix + "threadFactory.name"); + final ThreadFactory threadFactory; + if (threadFactoryName != null) + { + threadFactory = lookupByName(threadFactoryName, ThreadFactory.class); + } + else + { + threadFactory = Executors.defaultThreadFactory(); + } + + final String rejectedHandlerName = ConfigResolver.getPropertyValue(configPrefix + "rejectedExecutionHandler.name"); + final RejectedExecutionHandler rejectedHandler; + if (rejectedHandlerName != null) + { + rejectedHandler = lookupByName(rejectedHandlerName, RejectedExecutionHandler.class); + } + else + { + rejectedHandler = new ThreadPoolExecutor.AbortPolicy(); + } + + pool = new ThreadPoolExecutor( + coreSize, maxSize, + keepAlive, TimeUnit.valueOf(keepAliveUnit), + queue, threadFactory, rejectedHandler); + } + + pools.put(name, pool); } } } + return pool; + } + + private <T> T lookupByName(final String name, final Class<T> type) { + final Set<Bean<?>> tfb = beanManager.getBeans(name); + final Bean<?> bean = beanManager.resolve(tfb); + final CreationalContext<?> ctx = beanManager.createCreationalContext(null); + if (!beanManager.isNormalScope(bean.getScope())) + { + contexts.add(ctx); + } + return type.cast(beanManager.getReference(bean, type, ctx)); } } http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java new file mode 100644 index 0000000..111dda4 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.future; + +import org.apache.deltaspike.core.api.config.ConfigResolver; +import org.apache.deltaspike.core.impl.config.PropertiesConfigSource; +import org.apache.deltaspike.core.impl.scope.conversation.ConversationBeanHolder; +import org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessBeanAccessHistory; +import org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessBeanHolder; +import org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessViewHistory; +import org.apache.deltaspike.core.impl.scope.window.DefaultWindowContextQuotaHandler; +import org.apache.deltaspike.core.impl.scope.window.WindowBeanHolder; +import org.apache.deltaspike.core.impl.scope.window.WindowContextProducer; +import org.apache.deltaspike.core.impl.scope.window.WindowContextQuotaHandlerCache; +import org.apache.deltaspike.core.impl.scope.window.WindowIdHolder; +import org.apache.deltaspike.core.spi.config.ConfigSource; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.inject.Inject; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +@RunWith(Arquillian.class) +public class ThreadPoolManagerTest +{ + @Deployment + public static WebArchive deploy() + { + return ShrinkWrap.create(WebArchive.class, "ThreadPoolManagerTest.war") + .addAsLibraries(ShrinkWrap.create(JavaArchive.class, "deltaspike-core-fake.jar") + .addClasses( + ThreadPoolManager.class, + WindowContextProducer.class, WindowBeanHolder.class, WindowIdHolder.class, + DefaultWindowContextQuotaHandler.class, WindowContextQuotaHandlerCache.class, + ConversationBeanHolder.class, ViewAccessBeanHolder.class, + ViewAccessBeanAccessHistory.class, ViewAccessViewHistory.class) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml")) + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); + } + + @Inject + private ThreadPoolManager manager; + + @Test + public void defaultPool() throws ExecutionException, InterruptedException + { + final ExecutorService auto = manager.find("auto"); + assertEquals(auto, auto); + assertSame(auto, auto); + assertEquals(Runtime.getRuntime().availableProcessors(), ThreadPoolExecutor.class.cast(auto).getCorePoolSize()); + assertUsable(auto); + } + + @Test // this test validates we read the config properly but also it is lazy + public void configuredPool() throws ExecutionException, InterruptedException + { + ConfigResolver.addConfigSources(Collections.<ConfigSource>singletonList(new PropertiesConfigSource(new Properties() + {{ + setProperty("futurable.pool.custom.coreSize", "5"); + }}) + { + @Override + public String getConfigName() + { + return "configuredPool"; + } + })); + final ExecutorService custom = manager.find("custom"); + assertEquals(custom, custom); + assertSame(custom, custom); + assertEquals(5, ThreadPoolExecutor.class.cast(custom).getCorePoolSize()); + assertUsable(custom); + } + + private void assertUsable(final ExecutorService pool) throws InterruptedException, ExecutionException + { + assertEquals("ok", pool.submit(new Callable<String>() + { + @Override + public String call() throws Exception + { + return "ok"; + } + }).get()); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/documentation/src/main/asciidoc/core.adoc ---------------------------------------------------------------------- diff --git a/documentation/src/main/asciidoc/core.adoc b/documentation/src/main/asciidoc/core.adoc index 4da3c9f..9fd5dce 100644 --- a/documentation/src/main/asciidoc/core.adoc +++ b/documentation/src/main/asciidoc/core.adoc @@ -1182,6 +1182,32 @@ DeltaSpike provides support for executing code in an asynchronous manner. The b - `@Locked` - Ability to prevent concurrent access to a method based on its usage of reads/writes. - `@Throttled` - Ability to limit how frequently a method can be invoked. +== @Futureable configuration + +The strategy to find the `ExecutorService` associated to the `@Futureable` name is the following one: + +1. Check if there is a CDI bean of type `ExecutorService` with the name of the pool, if not try 2 +2. Check if there is a JNDI entry matching the pool name directly or prefixed with `java:app/`, `java:global/`, `java:global/threads/`, `java:global/deltaspike/`, `java:`, if not try 3 +3. Read the configuration and create a `ThreadPoolExecutor` + +IMPORTANT: the instance is looked up only once so from the first time it was read you can't change any configuration anymore. + +If you rely on the last option (configured executor) here are the keys you can set in DeltaSpike configuration: + +|=== +| Key | Description | Default +| futurable.pool.<pool name>.coreSize | The core size of the pool. | Number of available processors +| futurable.pool.<pool name>.maxSize | The max size of the pool. | coreSize value +| futurable.pool.<pool name>.keepAlive.value | Pool keep alive (when a thread is released). | 0 +| futurable.pool.<pool name>.keepAlive.unit | Unit of keepAlive.value. It must match a `TIMEUNIT` name. | MILLISECONDS +| futurable.pool.<pool name>.queue.type | The task queue type of the executor. Can be `ARRAY` to use an `ArrayBlockingQueue`, `LINKED` for a `LinkedBlockingQueue` or `SYNCHRONOUS` for a `SynchronousQueue`. | LINKED +| futurable.pool.<pool name>.queue.fair | For synchronous and array queue types, if the queue is fair. | false +| futurable.pool.<pool name>.queue.size | For array queue type, the size of the queue. | 1024 +| futurable.pool.<pool name>.queue.capacity | For linked queue type, the capacity of the queue. | `Integer.MAX_VALUE` +| futurable.pool.<pool name>.threadFactory.name | If set a CDI bean matching the value will be looked up and used as `ThreadFactory`. | none, `Executors.defaultThreadFactory()` is used +| futurable.pool.<pool name>.rejectedExecutionHandler.name | If set a CDI bean matching the value will be looked up and used as `RejectedExecutionHandler`. | none, `ThreadPoolExecutor.AbortPolicy` is used +|=== + == Utilities DeltaSpike provides many utility classes (no constructor / static