Author: fhanik
Date: Mon Aug 24 15:33:48 2009
New Revision: 807284
URL: http://svn.apache.org/viewvc?rev=807284&view=rev
Log:
First round of refactoring connectors.
Remove the worker based thread pools
Enable local or injected executors
Add in a resizable executors interface to be used in future revisions
start abstracting out and using a base class. There was one, deleted, since its
not used anywhere
Added:
tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (with
props)
tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
(with props)
Removed:
tomcat/trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Added: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=807284&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Mon Aug
24 15:33:48 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import org.apache.tomcat.util.res.StringManager;
+/**
+ *
+ * @author fhanik
+ * @author Mladen Turk
+ * @author Remy Maucherat
+ */
+public abstract class AbstractEndpoint {
+
+ // -------------------------------------------------------------- Constants
+ protected StringManager sm =
StringManager.getManager("org.apache.tomcat.util.net.res");
+
+ /**
+ * The Request attribute key for the cipher suite.
+ */
+ public static final String CIPHER_SUITE_KEY =
"javax.servlet.request.cipher_suite";
+
+ /**
+ * The Request attribute key for the key size.
+ */
+ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
+
+ /**
+ * The Request attribute key for the client certificate chain.
+ */
+ public static final String CERTIFICATE_KEY =
"javax.servlet.request.X509Certificate";
+
+ /**
+ * The Request attribute key for the session id.
+ * This one is a Tomcat extension to the Servlet spec.
+ */
+ public static final String SESSION_ID_KEY =
"javax.servlet.request.ssl_session";
+
+ /**
+ * The request attribute key for the session manager.
+ * This one is a Tomcat extension to the Servlet spec.
+ */
+ public static final String SESSION_MGR =
"javax.servlet.request.ssl_session_mgr";
+
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Aug 24
15:33:48 2009
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
@@ -37,6 +39,10 @@
import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
/**
* APR tailored thread pool, providing the following services:
@@ -53,7 +59,7 @@
* @author Mladen Turk
* @author Remy Maucherat
*/
-public class AprEndpoint {
+public class AprEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
@@ -61,8 +67,6 @@
protected static Log log = LogFactory.getLog(AprEndpoint.class);
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
/**
@@ -86,24 +90,11 @@
*/
public static final String SESSION_ID_KEY =
"javax.servlet.request.ssl_session";
- /**
- * The request attribute key for the session manager.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_MGR =
- "javax.servlet.request.ssl_session_mgr";
-
// ----------------------------------------------------------------- Fields
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
@@ -163,6 +154,10 @@
protected long sslContext = 0;
+ /**
+ * Are we using an internal executor
+ */
+ protected volatile boolean internalExecutor = false;
// ------------------------------------------------------------- Properties
@@ -188,10 +183,8 @@
protected int maxThreads = 200;
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
- if (running) {
- synchronized(workers) {
- workers.resize(maxThreads);
- }
+ if (running && executor instanceof ResizableExecutor) {
+ ((ResizableExecutor)executor).resizePool(getMinSpareThreads(),
getMaxThreads());
}
}
public int getMaxThreads() { return maxThreads; }
@@ -545,9 +538,15 @@
*/
public int getCurrentThreadCount() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
+ } else {
+ return -1;
+ }
} else {
- return curThreads;
+ return -2;
}
}
@@ -558,9 +557,15 @@
*/
public int getCurrentThreadsBusy() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
+ } else {
+ return -1;
+ }
} else {
- return workers!=null?curThreads - workers.size():0;
+ return -2;
}
}
@@ -744,7 +749,11 @@
// Create worker collection
if (executor == null) {
- workers = new WorkerStack(maxThreads);
+ internalExecutor = true;
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() +
"-exec-", daemon, getThreadPriority());
+ executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// Start poller threads
@@ -838,6 +847,16 @@
sendfiles = null;
}
}
+ if ( executor!=null && internalExecutor ) {
+ if ( executor instanceof ThreadPoolExecutor ) {
+ //this is our internal one, so we need to shut it down
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+ tpe.shutdownNow();
+ TaskQueue queue = (TaskQueue) tpe.getQueue();
+ queue.setParent(null);
+ }
+ executor = null;
+ }
}
@@ -946,86 +965,6 @@
}
- /**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return (workers.pop());
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- if (curThreadsBusy == maxThreads) {
- log.info(sm.getString("endpoint.info.maxThreads",
- Integer.toString(maxThreads), address,
- Integer.toString(port)));
- }
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
/**
* Allocate a new poller of the specified size.
@@ -1050,11 +989,10 @@
*/
protected boolean processSocketWithOptions(long socket) {
try {
- if (executor == null) {
- getWorkerThread().assignWithOptions(socket);
- } else {
- executor.execute(new SocketWithOptionsProcessor(socket));
- }
+ executor.execute(new SocketWithOptionsProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
@@ -1070,11 +1008,10 @@
*/
protected boolean processSocket(long socket) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
+ executor.execute(new SocketProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
@@ -1090,11 +1027,10 @@
*/
protected boolean processSocket(long socket, SocketStatus status) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket, status);
- } else {
- executor.execute(new SocketEventProcessor(socket, status));
- }
+ executor.execute(new SocketEventProcessor(socket, status));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
@@ -1389,178 +1325,6 @@
// ----------------------------------------------------- Worker Inner Class
- /**
- * Server processor class.
- */
- protected class Worker implements Runnable {
-
-
- protected Thread thread = null;
- protected boolean available = false;
- protected long socket = 0;
- protected SocketStatus status = null;
- protected boolean options = false;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and
swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assignWithOptions(long socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- status = null;
- options = true;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and
swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assign(long socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- status = null;
- options = false;
- available = true;
- notifyAll();
-
- }
-
-
- protected synchronized void assign(long socket, SocketStatus status) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- this.status = status;
- options = false;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or
<code>null</code>
- * if we are supposed to shut down.
- */
- protected synchronized long await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- long socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections
and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- long socket = await();
- if (socket == 0)
- continue;
-
- if (!deferAccept && options) {
- if (setSocketOptions(socket)) {
- getPoller().add(socket);
- } else {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- }
- } else {
-
- // Process the request from this socket
- if ((status != null) && (handler.event(socket, status) ==
Handler.SocketState.CLOSED)) {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- } else if ((status == null) && ((options &&
!setSocketOptions(socket))
- || handler.process(socket) ==
Handler.SocketState.CLOSED)) {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- }
- }
-
- // Finish up this request
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
// ----------------------------------------------- SendfileData Inner Class
@@ -1887,83 +1651,6 @@
}
- // ------------------------------------------------- WorkerStack Inner
Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue. If the queue is full (for example if
- * the queue has been reduced in size) the object will be dropped.
- *
- * @param object the object to be appended to the queue (first
- * element).
- */
- public void push(Worker worker) {
- if (end < workers.length) {
- workers[end++] = worker;
- } else {
- curThreads--;
- }
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
-
- /**
- * Resize the queue. If there are too many objects in the queue for the
- * new size, drop the excess.
- *
- * @param newSize
- */
- public void resize(int newSize) {
- Worker[] newWorkers = new Worker[newSize];
- int len = workers.length;
- if (newSize < len) {
- len = newSize;
- }
- System.arraycopy(workers, 0, newWorkers, 0, len);
- workers = newWorkers;
- }
- }
-
// ---------------------------------------------- SocketProcessor Inner
Class
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Mon Aug 24
15:33:48 2009
@@ -23,11 +23,17 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
/**
* Handle incoming TCP connections.
@@ -45,50 +51,17 @@
* @author Yoav Shapira
* @author Remy Maucherat
*/
-public class JIoEndpoint {
+public class JIoEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
-
protected static Log log = LogFactory.getLog(JIoEndpoint.class);
- protected StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY =
"javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY =
"javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY =
"javax.servlet.request.ssl_session";
-
-
// ----------------------------------------------------------------- Fields
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
@@ -134,6 +107,10 @@
*/
protected SocketProperties socketProperties = new SocketProperties();
+ /**
+ * Are we using an internal executor
+ */
+ protected volatile boolean internalExecutor = false;
// ------------------------------------------------------------- Properties
@@ -177,13 +154,19 @@
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
if (running) {
- synchronized(workers) {
- workers.resize(maxThreads);
- }
+ //TODO Dynamic resize
+ log.error("Resizing executor dynamically is not possible at this
time.");
}
}
public int getMaxThreads() { return maxThreads; }
+ public int minSpareThreads = 10;
+ public int getMinSpareThreads() {
+ return Math.min(minSpareThreads,getMaxThreads());
+ }
+ public void setMinSpareThreads(int minSpareThreads) {
+ this.minSpareThreads = minSpareThreads;
+ }
/**
* Priority of the acceptor and poller threads.
@@ -304,9 +287,15 @@
*/
public int getCurrentThreadCount() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
+ } else {
+ return -1;
+ }
} else {
- return curThreads;
+ return -2;
}
}
@@ -317,9 +306,15 @@
*/
public int getCurrentThreadsBusy() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
+ } else {
+ return -1;
+ }
} else {
- return workers!=null?curThreads - workers.size():0;
+ return -2;
}
}
@@ -426,113 +421,6 @@
}
- // ----------------------------------------------------- Worker Inner Class
-
-
- protected class Worker implements Runnable {
-
- protected Thread thread = null;
- protected boolean available = false;
- protected Socket socket = null;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and
swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- synchronized void assign(Socket socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or
<code>null</code>
- * if we are supposed to shut down.
- */
- private synchronized Socket await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- Socket socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections
and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- Socket socket = await();
- if (socket == null)
- continue;
-
- // Process the request from this socket
- if (!setSocketOptions(socket) || !handler.process(socket)) {
- // Close socket
- try {
- socket.close();
- } catch (IOException e) {
- }
- }
-
- // Finish up this request
- socket = null;
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
-
// -------------------- Public methods --------------------
@@ -583,7 +471,11 @@
// Create worker collection
if (executor == null) {
- workers = new WorkerStack(maxThreads);
+ internalExecutor = true;
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() +
"-exec-", daemon, getThreadPriority());
+ executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// Start acceptor threads
@@ -614,6 +506,16 @@
running = false;
unlockAccept();
}
+ if ( executor!=null && internalExecutor ) {
+ if ( executor instanceof ThreadPoolExecutor ) {
+ //this is our internal one, so we need to shut it down
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+ tpe.shutdownNow();
+ TaskQueue queue = (TaskQueue) tpe.getQueue();
+ queue.setParent(null);
+ }
+ executor = null;
+ }
}
/**
@@ -696,97 +598,16 @@
/**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return workers.pop();
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- if (curThreadsBusy == maxThreads) {
- log.info(sm.getString("endpoint.info.maxThreads",
- Integer.toString(maxThreads), address,
- Integer.toString(port)));
- }
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
-
- /**
* Process given socket.
*/
protected boolean processSocket(Socket socket) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
+ executor.execute(new SocketProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
+
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
@@ -796,81 +617,4 @@
}
- // ------------------------------------------------- WorkerStack Inner
Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue. If the queue is full (for example if
- * the queue has been reduced in size) the object will be dropped.
- *
- * @param object the object to be appended to the queue (first
- * element).
- */
- public void push(Worker worker) {
- if (end < workers.length) {
- workers[end++] = worker;
- } else {
- curThreads--;
- }
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
-
- /**
- * Resize the queue. If there are too many objects in the queue for the
- * new size, drop the excess.
- *
- * @param newSize
- */
- public void resize(int newSize) {
- Worker[] newWorkers = new Worker[newSize];
- int len = workers.length;
- if (newSize < len) {
- len = newSize;
- }
- System.arraycopy(workers, 0, newWorkers, 0, len);
- workers = newWorkers;
- }
- }
-
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Aug 24
15:33:48 2009
@@ -58,6 +58,7 @@
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
@@ -77,7 +78,7 @@
* @author Remy Maucherat
* @author Filip Hanik
*/
-public class NioEndpoint {
+public class NioEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
@@ -85,30 +86,6 @@
protected static Log log = LogFactory.getLog(NioEndpoint.class);
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY =
"javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY =
"javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY =
"javax.servlet.request.ssl_session";
public static final int OP_REGISTER = 0x100; //register interest op
public static final int OP_CALLBACK = 0x200; //callback interest op
@@ -333,7 +310,7 @@
/**
* Are we using an internal executor
*/
- protected boolean internalExecutor = true;
+ protected volatile boolean internalExecutor = false;
protected boolean useExecutor = true;
/**
@@ -518,13 +495,16 @@
/**
* Dummy maxSpareThreads property.
*/
- public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
+ public int getMaxSpareThreads() { return
Math.min(getMaxThreads(),getMinSpareThreads()); }
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
+ public int minSpareThreads = 10;
+ public int getMinSpareThreads() {
+ return Math.min(minSpareThreads,getMaxThreads());
+ }
+ public void setMinSpareThreads(int minSpareThreads) {
+ this.minSpareThreads = minSpareThreads;
+ }
/**
* Generic properties, introspected
@@ -733,6 +713,8 @@
if (executor!=null) {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
} else {
return -1;
}
@@ -750,6 +732,8 @@
if (executor!=null) {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
} else {
return -1;
}
@@ -1142,9 +1126,7 @@
if ( dispatch && executor!=null ) executor.execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
- if (log.isDebugEnabled()) {
- log.debug("Unable to process socket, executor rejected the
task.",rx);
- }
+ log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java?rev=807284&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java Mon
Aug 24 15:33:48 2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.Executor;
+
+public interface ResizableExecutor extends Executor {
+ /**
+ * {...@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+ * @return {...@link
java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+ */
+ public int getPoolSize();
+
+ /**
+ * {...@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+ * @return {...@link
java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+ */
+ public int getActiveCount();
+
+ public boolean resizePool(int corePoolSize, int maximumPoolSize);
+
+ public boolean resizeQueue(int capacity);
+
+}
Propchange:
tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]