Author: peter_firmstone Date: Thu Dec 3 12:12:08 2015 New Revision: 1717747
URL: http://svn.apache.org/viewvc?rev=1717747&view=rev Log: ThreadPool's implementation was not optimal and was found to be a hotspot in outrigger stress tests, it is now a wrapper around a java concurrent Executor. Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java?rev=1717747&r1=1717746&r2=1717747&view=diff ============================================================================== --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java (original) +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/thread/ThreadPool.java Thu Dec 3 12:12:08 2015 @@ -18,14 +18,12 @@ package org.apache.river.thread; -import org.apache.river.action.GetLongAction; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadFactory; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,20 +61,8 @@ import java.util.logging.Logger; **/ final class ThreadPool implements Executor, java.util.concurrent.Executor { - /** how long a thread waits in the idle state before passing away */ - private static final long IDLE_TIMEOUT = // default 5 minutes - ((Long) AccessController.doPrivileged(new GetLongAction( - "org.apache.river.thread.idleThreadTimeout", 300000))) - .longValue(); - private static final Logger logger = Logger.getLogger("org.apache.river.thread.ThreadPool"); - - /** thread group that this pool's threads execute in */ - private final ThreadGroup threadGroup; - - /** queues of tasks to execute */ - private final BlockingQueue<Runnable> queue; /** * This Executor is used by JERI (and other Jini implementation classes) @@ -98,18 +84,6 @@ final class ThreadPool implements Execut * ThreadPool must degrade gracefully when a system is under significant * load, but it must also execute tasks as soon as possible. * - * To address these issues, a SynchronousQueue was originally selected, it has - * no storage capacity, it hands tasks directly from the calling thread to - * the task thread, however contention can cause more threads than necessary - * to be created, a LinkedBlockingQueue eliminates or reduces contention - * between caller and worker threads, preventing unnecessary thread creation. - * Consider TransferBlockingQueue when Java 6 is no longer supported. - * - * Pool threads block waiting until a task is available or idleTimeout - * occurs after which the pool thread dies, client threads block waiting - * until a task thread is available, or after an computed timeout elapses, - * creates a new thread to execute the task. - * * ThreadGroup is a construct originally intended for applet isolation, * however it was never really successful, AccessControlContext * is a much more effective way of controlling privilege. @@ -117,25 +91,13 @@ final class ThreadPool implements Execut * We should consider changing this to ensure that each task is executed in the * AccessControlContext of the calling thread, to avoid privilege escalation. */ - private final AtomicInteger workerCount; - private final AtomicInteger availableThreads; private volatile boolean shutdown = false; + private final ExecutorService es; ThreadPool(ThreadGroup threadGroup){ - this(threadGroup, 10); - } - - /** - * Creates a new thread group that executes tasks in threads of - * the given thread group. - */ - ThreadPool(final ThreadGroup threadGroup, int delayFactor) { - this.threadGroup = threadGroup; - queue = new ArrayBlockingQueue<Runnable>(32); - workerCount = new AtomicInteger(); - availableThreads = new AtomicInteger(); -// Thread not started until after constructor completes -// this escaping occurs safely. + this(Executors.newCachedThreadPool(new TPThreadFactory(threadGroup))); // Final field freeze +// Thread not started until after constructor completes +// this escaping occurs safely anyway because of final field freeze. AccessController.doPrivileged(new PrivilegedAction(){ @Override @@ -145,6 +107,10 @@ final class ThreadPool implements Execut } }); } + + private ThreadPool(ExecutorService es){ + this.es = es; + } private Thread shutdownHook(){ Thread t = new Thread ( new Runnable(){ @@ -158,11 +124,7 @@ final class ThreadPool implements Execut Thread.currentThread().interrupt(); } shutdown = true; - Thread [] threads = new Thread [workerCount.get() + 1 ]; - int count = threadGroup.enumerate(threads); - for (int i = 0; i < count; i++){ - threads [i].interrupt(); - } + es.shutdown(); } },"ThreadPool destroy"); /** @@ -180,27 +142,8 @@ final class ThreadPool implements Execut public void execute(Runnable runnable, String name) throws RejectedExecutionException { if (runnable == null) return; if (shutdown) throw new RejectedExecutionException("ThreadPool shutdown"); - Runnable task = new Task(runnable, name); - /* Startup ramps up very quickly because there are no waiting - * threads available. - * - * Tasks must not be allowed to build up in the queue, in case - * of dependencies. - */ - if ( availableThreads.get() < 2 )// Keep at least one thread ready. - { // need more threads. - if (shutdown) throw - new RejectedExecutionException("ThreadPool shutdown"); - Thread t = AccessController.doPrivileged( - new NewThreadAction(threadGroup, new Worker(task), name, false, 228)); - t.start(); - } else { - try { - queue.put(task); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } + Runnable task = new Task(runnable, name); + es.submit(task); } @Override @@ -223,7 +166,9 @@ final class ThreadPool implements Execut @Override public void run(){ + Thread thread = Thread.currentThread(); try { + thread.setName(NewThreadAction.NAME_PREFIX + name); runnable.run(); } catch (Exception t) { // Don't catch Error logger.log(Level.WARNING, "uncaught exception", t); @@ -239,6 +184,8 @@ final class ThreadPool implements Execut // set so the while loop stops. Thread.currentThread().interrupt(); } + } finally { + thread.setName(NewThreadAction.NAME_PREFIX + "idle"); } } @@ -247,58 +194,24 @@ final class ThreadPool implements Execut return name; } } - + /** - * Worker executes an initial task, and then it executes tasks from the - * queue, passing away if ever idle for more than the idle timeout value. + * Thread stack size hint given to jvm to minimise memory consumption + * as this executor can create many threads, tasks executed are relatively + * simple and don't need much memory. The jvm is free to ignore this hint. */ - private class Worker implements Runnable { - - private volatile Runnable first; - - Worker(Runnable first) { - this.first = first; - } + private static class TPThreadFactory implements ThreadFactory { + /** thread group that this pool's threads execute in */ + final ThreadGroup threadGroup; + + TPThreadFactory (ThreadGroup group){ + threadGroup = group; + } - @Override - public void run() { - workerCount.incrementAndGet(); - try { - Runnable task = first; - first = null; // For garbage collection. - task.run(); - Thread thread = Thread.currentThread(); - while (!thread.isInterrupted()) { - /* - * REMIND: What if the task changed this thread's - * priority? or context class loader? - * - * thread.setName is not thread safe, so may not reflect - * most up to date state - */ - try { - task = null; - availableThreads.incrementAndGet(); - try { - task = queue.poll(IDLE_TIMEOUT, TimeUnit.MILLISECONDS); - } finally { - availableThreads.decrementAndGet(); - } - if (task != null) { - thread.setName(NewThreadAction.NAME_PREFIX + task); - task.run(); - thread.setName(NewThreadAction.NAME_PREFIX + "Idle"); - } else { - break; //Timeout or spurious wakeup. - } - } catch (InterruptedException e){ - thread.interrupt(); - break; - } - } - } finally { - workerCount.decrementAndGet(); - } + public Thread newThread(Runnable r) { + return AccessController.doPrivileged( + new NewThreadAction(threadGroup, r, NewThreadAction.NAME_PREFIX, false, 228)); } + } }
