Author: fhanik Date: Tue Dec 9 12:56:59 2008 New Revision: 724886 URL: http://svn.apache.org/viewvc?rev=724886&view=rev Log: Refactored the thread pooling when using an executor, this gets rid of duplicate code in the NIO connector as well as in the org.apache.catalina.core.StandardThreadExecutor package. I provided a ThreadPoolExecutor with a small extension to the java.util.concurrent The connector method setExecutor still take a java.util.concurrent.Executor as an argument to provide the most flexibility
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java (with props) tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java (with props) tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (with props) Modified: tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java?rev=724886&r1=724885&r2=724886&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java (original) +++ tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java Tue Dec 9 12:56:59 2008 @@ -17,18 +17,16 @@ package org.apache.catalina.core; -import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.Executor; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleListener; import org.apache.catalina.util.LifecycleSupport; -import java.util.concurrent.RejectedExecutionException; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; public class StandardThreadExecutor implements Executor { @@ -90,7 +88,7 @@ public void start() throws LifecycleException { lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null); TaskQueue taskqueue = new TaskQueue(maxQueueSize); - TaskThreadFactory tf = new TaskThreadFactory(namePrefix); + TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority()); lifecycle.fireLifecycleEvent(START_EVENT, null); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); @@ -107,17 +105,10 @@ public void execute(Runnable command, long timeout, TimeUnit unit) { if ( executor != null ) { - try { - executor.execute(command); - } catch (RejectedExecutionException rx) { - //there could have been contention around the queue - try { - if ( !( (TaskQueue) executor.getQueue()).force(command,timeout,unit) ) throw new RejectedExecutionException("Work queue full."); - }catch (InterruptedException x) { - throw new RejectedExecutionException("Interrupted.",x); - } - } - } else throw new IllegalStateException("StandardThreadPool not started."); + executor.execute(command,timeout,unit); + } else { + throw new IllegalStateException("StandardThreadExecutor not started."); + } } @@ -258,71 +249,4 @@ public int getQueueSize() { return (executor != null) ? executor.getQueue().size() : -1; } - - // ---------------------------------------------- TaskQueue Inner Class - class TaskQueue extends LinkedBlockingQueue<Runnable> { - ThreadPoolExecutor parent = null; - - public TaskQueue() { - super(); - } - - public TaskQueue(int capacity) { - super(capacity); - } - - public TaskQueue(Collection<? extends Runnable> c) { - super(c); - } - - public void setParent(ThreadPoolExecutor tp) { - parent = tp; - } - - public boolean force(Runnable o) { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; - //if we reached here, we need to add it to the queue - return super.offer(o); - } - } - - // ---------------------------------------------- ThreadFactory Inner Class - class TaskThreadFactory implements ThreadFactory { - final ThreadGroup group; - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - TaskThreadFactory(String namePrefix) { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.namePrefix = namePrefix; - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); - t.setDaemon(daemon); - t.setPriority(getThreadPriority()); - return t; - } - } - - } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=724886&r1=724885&r2=724886&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Dec 9 12:56:59 2008 @@ -60,6 +60,8 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import org.apache.tomcat.util.net.jsse.NioX509KeyManager; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; /** * NIO tailored thread pool, providing the following services: @@ -339,8 +341,15 @@ * External Executor based thread pool. */ protected Executor executor = null; - public void setExecutor(Executor executor) { this.executor = executor; } + public void setExecutor(Executor executor) { + this.executor = executor; + this.internalExecutor = (executor==null); + } public Executor getExecutor() { return executor; } + /** + * Are we using an internal executor + */ + protected boolean internalExecutor = true; protected boolean useExecutor = true; /** @@ -356,12 +365,8 @@ protected int maxThreads = 200; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; - if (running) { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads); - } - } + if (running && executor!=null && executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads); } } public int getMaxThreads() { return maxThreads; } @@ -872,10 +877,11 @@ // Create worker collection if ( executor == null ) { + internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); - TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor, this); + taskqueue.setParent( (ThreadPoolExecutor) executor); } // Start poller threads @@ -938,13 +944,13 @@ keyCache.clear(); nioChannels.clear(); processorCache.clear(); - if ( executor!=null ) { + if ( executor!=null && internalExecutor ) { if ( executor instanceof ThreadPoolExecutor ) { //this is our internal one, so we need to shut it down ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; tpe.shutdown(); TaskQueue queue = (TaskQueue) tpe.getQueue(); - queue.setParent(null,null); + queue.setParent(null); } executor = null; } @@ -1955,68 +1961,8 @@ } } - - // ---------------------------------------------- TaskQueue Inner Class - public static class TaskQueue extends LinkedBlockingQueue<Runnable> { - ThreadPoolExecutor parent = null; - NioEndpoint endpoint = null; - - public TaskQueue() { - super(); - } - public TaskQueue(int initialCapacity) { - super(initialCapacity); - } - - public TaskQueue(Collection<? extends Runnable> c) { - super(c); - } - - - public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) { - parent = tp; - this.endpoint = ep; - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; - //if we reached here, we need to add it to the queue - return super.offer(o); - } - } - - // ---------------------------------------------- ThreadFactory Inner Class - class TaskThreadFactory implements ThreadFactory { - final ThreadGroup group; - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - TaskThreadFactory(String namePrefix) { - SecurityManager s = System.getSecurityManager(); - group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.namePrefix = namePrefix; - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); - t.setDaemon(daemon); - t.setPriority(getThreadPriority()); - return t; - } - } - // ----------------------------------------------- SendfileData Inner Class - - /** * SendfileData class. */ @@ -2029,5 +1975,4 @@ // KeepAlive flag public boolean keepAlive; } - } Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java?rev=724886&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java Tue Dec 9 12:56:59 2008 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +/** + * As task queue specifically designed to run with a thread pool executor. + * The task queue is optimised to properly utilize threads within + * a thread pool executor. If you use a normal queue, the executor will spawn threads + * when there are idle threads and you wont be able to force items unto the queue itself + * @author fhanik + * + */ +public class TaskQueue extends LinkedBlockingQueue<Runnable> { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int capacity) { + super(capacity); + } + + public TaskQueue(Collection<? extends Runnable> c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; + //if we reached here, we need to add it to the queue + return super.offer(o); + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java?rev=724886&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java Tue Dec 9 12:56:59 2008 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Simple task thread factory to use to create threads for an executor implementation. + * @author fhanik + * + */ +public class TaskThreadFactory implements ThreadFactory { + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + final boolean daemon; + final int threadPriority; + public TaskThreadFactory(String namePrefix, boolean daemon, int priority) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + this.daemon = daemon; + this.threadPriority = priority; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(daemon); + t.setPriority(threadPriority); + return t; + } + +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java?rev=724886&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java Tue Dec 9 12:56:59 2008 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient + * getActiveCount method, to be used to properly handle the work queue + * @author fhanik + * + */ +public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { + + final AtomicInteger activeCount = new AtomicInteger(0); + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + activeCount.decrementAndGet(); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + activeCount.incrementAndGet(); + } + + @Override + public int getActiveCount() { + return activeCount.get(); + } + + public void execute(Runnable command, long timeout, TimeUnit unit) { + + } + + + + + +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]