Repository: deltaspike
Updated Branches:
  refs/heads/fb/DELTASPIKE-1302_ThreadPoolManager-configuration [created] 
c4006cc3b


DELTASPIKE-1302 configuration for ThreadPoolManager


Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/c4006cc3
Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/c4006cc3
Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/c4006cc3

Branch: refs/heads/fb/DELTASPIKE-1302_ThreadPoolManager-configuration
Commit: c4006cc3b5fda1eea5551fee5a0a7e15c19264d9
Parents: d1316e5
Author: Romain Manni-Bucau <rmannibu...@apache.org>
Authored: Sun Nov 26 15:31:46 2017 +0100
Committer: Romain Manni-Bucau <rmannibu...@apache.org>
Committed: Sun Nov 26 15:31:46 2017 +0100

----------------------------------------------------------------------
 .../core/impl/future/ThreadPoolManager.java     | 195 ++++++++++++++++---
 .../core/impl/future/ThreadPoolManagerTest.java | 116 +++++++++++
 documentation/src/main/asciidoc/core.adoc       |  26 +++
 3 files changed, 312 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
index 9dc6667..4e1f2a1 100644
--- 
a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
+++ 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java
@@ -18,23 +18,48 @@
  */
 package org.apache.deltaspike.core.impl.future;
 
+import org.apache.deltaspike.core.api.config.ConfigResolver;
 import org.apache.deltaspike.core.api.config.base.CoreBaseConfig;
 
 import javax.annotation.PreDestroy;
 import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.context.spi.CreationalContext;
+import javax.enterprise.inject.spi.Bean;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.inject.Inject;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Locale.ENGLISH;
+
 @ApplicationScoped
 public class ThreadPoolManager
 {
     private final ConcurrentMap<String, ExecutorService> pools = new 
ConcurrentHashMap<String, ExecutorService>();
-    private volatile ExecutorService defaultPool;
+    private final Collection<CreationalContext<?>> contexts = new 
ArrayList<CreationalContext<?>>(8);
     private volatile boolean closed = false;
 
+    @Inject
+    private BeanManager beanManager;
+
     @PreDestroy
     private void shutdown()
     {
@@ -43,21 +68,12 @@ public class ThreadPoolManager
         for (final ExecutorService es : pools.values())
         {
             es.shutdown();
-            try
-            {
-                es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (final InterruptedException e)
-            {
-                Thread.interrupted();
-            }
         }
-        if (defaultPool != null)
+        for (final ExecutorService es : pools.values())
         {
-            defaultPool.shutdown();
             try
             {
-                defaultPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+                es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
             }
             catch (final InterruptedException e)
             {
@@ -65,6 +81,12 @@ public class ThreadPoolManager
             }
         }
         pools.clear();
+
+        for (final CreationalContext<?> ctx : contexts)
+        {
+            ctx.release();
+        }
+        contexts.clear();
     }
 
     public ExecutorService find(final String name)
@@ -76,25 +98,148 @@ public class ThreadPoolManager
         ExecutorService pool = pools.get(name);
         if (pool == null)
         {
-            ensureDefaultPool();
-            pool = defaultPool;
-        }
-        return pool;
-    }
-
-    private void ensureDefaultPool()
-    {
-        if (defaultPool == null)
-        {
             synchronized (this)
             {
-                if (defaultPool == null)
+                pool = pools.get(name);
+                if (pool == null)
                 {
-                    defaultPool = Executors.newFixedThreadPool(
-                            Math.max(2, 
Runtime.getRuntime().availableProcessors()));
+                    // the instantiation does the following:
+                    // 1. check if there is a named bean matching this name 
using @Default qualifier
+                    // 2. check if there is a JNDI entry 
(ManagedExecutorService case) matching this name
+                    // 3. create a new executor service based on the DS-config
+
+                    // 1.
+                    final Set<Bean<?>> beans = beanManager.getBeans(name);
+                    if (beans != null && !beans.isEmpty())
+                    {
+                        final Bean<?> bean = beanManager.resolve(beans);
+                        if (bean.getTypes().contains(ExecutorService.class))
+                        {
+                            final CreationalContext<Object> creationalContext 
= beanManager.createCreationalContext(null);
+                            if (!beanManager.isNormalScope(bean.getScope()))
+                            {
+                                contexts.add(creationalContext);
+                            }
+                            pool = 
ExecutorService.class.cast(beanManager.getReference(bean, 
ExecutorService.class, creationalContext));
+                        }
+                    }
+
+                    if (pool == null) // 2.
+                    {
+                        for (final String prefix : asList(
+                                "", "java:app/", "java:global/", 
"java:global/threads/", "java:global/deltaspike/", "java:"))
+                        {
+                            try {
+                                final Object instance = new 
InitialContext().lookup(prefix + name);
+                                if (ExecutorService.class.isInstance(instance))
+                                {
+                                    pool = 
ExecutorService.class.cast(instance);
+                                    break;
+                                }
+                            } catch (final NamingException e) {
+                                // no-op
+                            }
+                        }
+                    }
+
+                    if (pool == null) // 3.
+                    {
+                        final String configPrefix = "futurable.pool." + name + 
".";
+                        final int coreSize = 
ConfigResolver.resolve(configPrefix + "coreSize")
+                                .as(Integer.class)
+                                .withDefault(Math.max(2, 
Runtime.getRuntime().availableProcessors()))
+                                .getValue();
+                        final int maxSize = 
ConfigResolver.resolve(configPrefix + "maxSize")
+                                .as(Integer.class)
+                                .withDefault(coreSize)
+                                .getValue();
+                        final long keepAlive = 
ConfigResolver.resolve(configPrefix + "keepAlive.value")
+                                .as(Long.class)
+                                .withDefault(0L)
+                                .getValue();
+                        final String keepAliveUnit = 
ConfigResolver.resolve(configPrefix + "keepAlive.unit")
+                                .as(String.class)
+                                .withDefault("MILLISECONDS")
+                                .getValue();
+
+                        final String queueType = 
ConfigResolver.resolve(configPrefix + "queue.type")
+                                .as(String.class)
+                                .withDefault("LINKED")
+                                .getValue();
+                        final BlockingQueue<Runnable> queue;
+                        if ("ARRAY".equalsIgnoreCase(queueType))
+                        {
+                            final int size = 
ConfigResolver.resolve(configPrefix + "queue.size")
+                                    .as(Integer.class)
+                                    .withDefault(1024)
+                                    .getValue();
+                            final boolean fair = 
ConfigResolver.resolve(configPrefix + "queue.fair")
+                                    .as(Boolean.class)
+                                    .withDefault(false)
+                                    .getValue();
+                            queue = new ArrayBlockingQueue<Runnable>(size, 
fair);
+                        }
+                        else if ("SYNCHRONOUS".equalsIgnoreCase(queueType))
+                        {
+                            final boolean fair = 
ConfigResolver.resolve(configPrefix + "queue.fair")
+                                    .as(Boolean.class)
+                                    .withDefault(false)
+                                    .getValue();
+                            queue = new SynchronousQueue<Runnable>(fair);
+                        }
+                        else/* (queueType.equalsIgnoreCase("LINKED")) */
+                        {
+                            final int capacity = 
ConfigResolver.resolve(configPrefix + "queue.capacity")
+                                    .as(Integer.class)
+                                    .withDefault(Integer.MAX_VALUE)
+                                    .getValue();
+                            queue = new 
LinkedBlockingQueue<Runnable>(capacity);
+                        }
+
+                        final String threadFactoryName = 
ConfigResolver.getPropertyValue(configPrefix + "threadFactory.name");
+                        final ThreadFactory threadFactory;
+                        if (threadFactoryName != null)
+                        {
+                            threadFactory = lookupByName(threadFactoryName, 
ThreadFactory.class);
+                        }
+                        else
+                        {
+                            threadFactory = Executors.defaultThreadFactory();
+                        }
+
+                        final String rejectedHandlerName = 
ConfigResolver.getPropertyValue(configPrefix + "rejectedExecutionHandler.name");
+                        final RejectedExecutionHandler rejectedHandler;
+                        if (rejectedHandlerName != null)
+                        {
+                            rejectedHandler = 
lookupByName(rejectedHandlerName, RejectedExecutionHandler.class);
+                        }
+                        else
+                        {
+                            rejectedHandler = new 
ThreadPoolExecutor.AbortPolicy();
+                        }
+
+                        pool = new ThreadPoolExecutor(
+                                coreSize, maxSize,
+                                keepAlive, TimeUnit.valueOf(keepAliveUnit),
+                                queue, threadFactory, rejectedHandler);
+                    }
+
+                    pools.put(name, pool);
                 }
             }
         }
+        return pool;
+    }
+
+    private <T> T lookupByName(final String name, final Class<T> type) {
+        final Set<Bean<?>> tfb = beanManager.getBeans(name);
+        final Bean<?> bean = beanManager.resolve(tfb);
+        final CreationalContext<?> ctx = 
beanManager.createCreationalContext(null);
+        if (!beanManager.isNormalScope(bean.getScope()))
+        {
+            contexts.add(ctx);
+        }
+        return type.cast(beanManager.getReference(bean, type, ctx));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java
new file mode 100644
index 0000000..111dda4
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/core/impl/future/ThreadPoolManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.future;
+
+import org.apache.deltaspike.core.api.config.ConfigResolver;
+import org.apache.deltaspike.core.impl.config.PropertiesConfigSource;
+import 
org.apache.deltaspike.core.impl.scope.conversation.ConversationBeanHolder;
+import 
org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessBeanAccessHistory;
+import org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessBeanHolder;
+import org.apache.deltaspike.core.impl.scope.viewaccess.ViewAccessViewHistory;
+import 
org.apache.deltaspike.core.impl.scope.window.DefaultWindowContextQuotaHandler;
+import org.apache.deltaspike.core.impl.scope.window.WindowBeanHolder;
+import org.apache.deltaspike.core.impl.scope.window.WindowContextProducer;
+import 
org.apache.deltaspike.core.impl.scope.window.WindowContextQuotaHandlerCache;
+import org.apache.deltaspike.core.impl.scope.window.WindowIdHolder;
+import org.apache.deltaspike.core.spi.config.ConfigSource;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.junit.Arquillian;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.EmptyAsset;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+@RunWith(Arquillian.class)
+public class ThreadPoolManagerTest
+{
+    @Deployment
+    public static WebArchive deploy()
+    {
+        return ShrinkWrap.create(WebArchive.class, "ThreadPoolManagerTest.war")
+                .addAsLibraries(ShrinkWrap.create(JavaArchive.class, 
"deltaspike-core-fake.jar")
+                    .addClasses(
+                            ThreadPoolManager.class,
+                            WindowContextProducer.class, 
WindowBeanHolder.class, WindowIdHolder.class,
+                            DefaultWindowContextQuotaHandler.class, 
WindowContextQuotaHandlerCache.class,
+                            ConversationBeanHolder.class, 
ViewAccessBeanHolder.class,
+                            ViewAccessBeanAccessHistory.class, 
ViewAccessViewHistory.class)
+                    .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"))
+                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
+    }
+
+    @Inject
+    private ThreadPoolManager manager;
+
+    @Test
+    public void defaultPool() throws ExecutionException, InterruptedException
+    {
+        final ExecutorService auto = manager.find("auto");
+        assertEquals(auto, auto);
+        assertSame(auto, auto);
+        assertEquals(Runtime.getRuntime().availableProcessors(), 
ThreadPoolExecutor.class.cast(auto).getCorePoolSize());
+        assertUsable(auto);
+    }
+
+    @Test // this test validates we read the config properly but also it is 
lazy
+    public void configuredPool() throws ExecutionException, 
InterruptedException
+    {
+        
ConfigResolver.addConfigSources(Collections.<ConfigSource>singletonList(new 
PropertiesConfigSource(new Properties()
+        {{
+            setProperty("futurable.pool.custom.coreSize", "5");
+        }})
+        {
+            @Override
+            public String getConfigName()
+            {
+                return "configuredPool";
+            }
+        }));
+        final ExecutorService custom = manager.find("custom");
+        assertEquals(custom, custom);
+        assertSame(custom, custom);
+        assertEquals(5, 
ThreadPoolExecutor.class.cast(custom).getCorePoolSize());
+        assertUsable(custom);
+    }
+
+    private void assertUsable(final ExecutorService pool) throws 
InterruptedException, ExecutionException
+    {
+        assertEquals("ok", pool.submit(new Callable<String>()
+        {
+            @Override
+            public String call() throws Exception
+            {
+                return "ok";
+            }
+        }).get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/c4006cc3/documentation/src/main/asciidoc/core.adoc
----------------------------------------------------------------------
diff --git a/documentation/src/main/asciidoc/core.adoc 
b/documentation/src/main/asciidoc/core.adoc
index 4da3c9f..9fd5dce 100644
--- a/documentation/src/main/asciidoc/core.adoc
+++ b/documentation/src/main/asciidoc/core.adoc
@@ -1182,6 +1182,32 @@ DeltaSpike provides support for executing code in an 
asynchronous manner.  The b
 - `@Locked` - Ability to prevent concurrent access to a method based on its 
usage of reads/writes.
 - `@Throttled` - Ability to limit how frequently a method can be invoked.
 
+== @Futureable configuration
+
+The strategy to find the `ExecutorService` associated to the `@Futureable` 
name is the following one:
+
+1. Check if there is a CDI bean of type `ExecutorService` with the name of the 
pool, if not try 2
+2. Check if there is a JNDI entry matching the pool name directly or prefixed 
with `java:app/`, `java:global/`, `java:global/threads/`, 
`java:global/deltaspike/`, `java:`, if not try 3
+3. Read the configuration and create a `ThreadPoolExecutor`
+
+IMPORTANT: the instance is looked up only once so from the first time it was 
read you can't change any configuration anymore.
+
+If you rely on the last option (configured executor) here are the keys you can 
set in DeltaSpike configuration:
+
+|===
+| Key | Description | Default
+| futurable.pool.<pool name>.coreSize | The core size of the pool. | Number of 
available processors
+| futurable.pool.<pool name>.maxSize | The max size of the pool. | coreSize 
value
+| futurable.pool.<pool name>.keepAlive.value | Pool keep alive (when a thread 
is released). | 0
+| futurable.pool.<pool name>.keepAlive.unit | Unit of keepAlive.value. It must 
match a `TIMEUNIT` name. | MILLISECONDS
+| futurable.pool.<pool name>.queue.type | The task queue type of the executor. 
Can be `ARRAY` to use an `ArrayBlockingQueue`, `LINKED` for a 
`LinkedBlockingQueue` or `SYNCHRONOUS` for a `SynchronousQueue`. | LINKED
+| futurable.pool.<pool name>.queue.fair | For synchronous and array queue 
types, if the queue is fair. | false
+| futurable.pool.<pool name>.queue.size | For array queue type, the size of 
the queue. | 1024
+| futurable.pool.<pool name>.queue.capacity | For linked queue type, the 
capacity of the queue. | `Integer.MAX_VALUE`
+| futurable.pool.<pool name>.threadFactory.name | If set a CDI bean matching 
the value will be looked up and used as `ThreadFactory`. | none, 
`Executors.defaultThreadFactory()` is used
+| futurable.pool.<pool name>.rejectedExecutionHandler.name | If set a CDI bean 
matching the value will be looked up and used as `RejectedExecutionHandler`. | 
none, `ThreadPoolExecutor.AbortPolicy` is used
+|===
+
 == Utilities
 
 DeltaSpike provides many utility classes (no constructor / static

Reply via email to