Author: fhanik Date: Thu Mar 25 16:41:05 2010 New Revision: 927490 URL: http://svn.apache.org/viewvc?rev=927490&view=rev Log: Enable async behavior for the AJP connector
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=927490&r1=927489&r2=927490&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Mar 25 16:41:05 2010 @@ -26,6 +26,7 @@ import java.net.InetAddress; import java.net.Socket; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -44,6 +45,9 @@ import org.apache.tomcat.util.http.HttpM import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.JIoEndpoint; +import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; @@ -181,7 +185,7 @@ public class AjpProcessor implements Act /** * Socket associated with the current connection. */ - protected Socket socket; + protected SocketWrapper<Socket> socket; /** @@ -271,6 +275,11 @@ public class AjpProcessor implements Act * Flush message array. */ protected static final byte[] flushMessageArray; + + /** + * Async used + */ + protected boolean async = false; // ----------------------------------------------------- Static Initializer @@ -357,18 +366,18 @@ public class AjpProcessor implements Act * * @throws IOException error during an I/O operation */ - public void process(Socket socket) + public SocketState process(SocketWrapper<Socket> socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the socket this.socket = socket; - input = socket.getInputStream(); - output = socket.getOutputStream(); + input = socket.getSocket().getInputStream(); + output = socket.getSocket().getOutputStream(); int soTimeout = -1; if (keepAliveTimeout > 0) { - soTimeout = socket.getSoTimeout(); + soTimeout = socket.getSocket().getSoTimeout(); } // Error flag @@ -380,7 +389,7 @@ public class AjpProcessor implements Act try { // Set keep alive timeout if enabled if (keepAliveTimeout > 0) { - socket.setSoTimeout(keepAliveTimeout); + socket.getSocket().setSoTimeout(keepAliveTimeout); } // Get first message of the request if (!readMessage(requestHeaderMessage)) { @@ -390,7 +399,7 @@ public class AjpProcessor implements Act } // Set back timeout if keep alive timeout is enabled if (keepAliveTimeout > 0) { - socket.setSoTimeout(soTimeout); + socket.getSocket().setSoTimeout(soTimeout); } // Check message type, process right away and break if // not regular request processing @@ -446,6 +455,10 @@ public class AjpProcessor implements Act error = true; } } + + if (async && !error) { + break; + } // Finish the response if not done yet if (!finished) { @@ -467,15 +480,66 @@ public class AjpProcessor implements Act recycle(); } + if (async && !error) { + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + socket.setAsync(true); + return SocketState.LONG; + } else { + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } + + } + + public SocketState asyncDispatch(SocketStatus status) throws IOException { + + RequestInfo rp = request.getRequestProcessor(); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.asyncDispatch(request, response, status); + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - recycle(); - input = null; - output = null; + + if (async) { + if (error) { + socket.setAsync(false); + response.setStatus(500); + request.updateCounters(); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } else { + socket.setAsync(false); + if (error) { + response.setStatus(500); + } + request.updateCounters(); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } + + + } - // ----------------------------------------------------- ActionHook Methods @@ -522,7 +586,7 @@ public class AjpProcessor implements Act } else if (actionCode == ActionCode.ACTION_CLOSE) { // Close - + async = false; // End the processing of the current request, and stop any further // transactions with the client @@ -602,6 +666,34 @@ public class AjpProcessor implements Act empty = false; replay = true; + } else if (actionCode == ActionCode.ACTION_ASYNC_START) { + //TODO SERVLET3 - async + async = true; + } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) { + //TODO SERVLET3 - async + AtomicBoolean dispatch = (AtomicBoolean)param; + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling + dispatch.set(true); + endpoint.processSocket(this.socket, SocketStatus.STOP); + } else { + dispatch.set(true); + } + } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) { + //TODO SERVLET3 - async + if (param==null) return; + long timeout = ((Long)param).longValue(); + //if we are not piggy backing on a worker thread, set the timeout + socket.setTimeout(timeout); + } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) { + RequestInfo rp = request.getRequestProcessor(); + AtomicBoolean dispatch = (AtomicBoolean)param; + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + endpoint.processSocket(this.socket, SocketStatus.OPEN); + dispatch.set(true); + } else { + dispatch.set(true); + } } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java?rev=927490&r1=927489&r2=927490&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Thu Mar 25 16:41:05 2010 @@ -22,6 +22,7 @@ import java.net.Socket; import java.net.URLEncoder; import java.util.Hashtable; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -320,6 +321,8 @@ public class AjpProtocol protected AjpProtocol proto; protected AtomicLong registerCount = new AtomicLong(0); protected RequestGroupInfo global = new RequestGroupInfo(); + protected ConcurrentHashMap<SocketWrapper, AjpProcessor> connections = + new ConcurrentHashMap<SocketWrapper, AjpProcessor>(); protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors = new ConcurrentLinkedQueue<AjpProcessor>() { @@ -364,23 +367,28 @@ public class AjpProtocol this.proto = proto; } - public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) { - throw new UnsupportedOperationException(); + public SocketState process(SocketWrapper<Socket> socket) { + return process(socket,SocketStatus.OPEN); } - public SocketState process(SocketWrapper<Socket> socket) { - AjpProcessor processor = recycledProcessors.poll(); + public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) { + AjpProcessor processor = connections.remove(socket); try { - + if (processor == null) { + processor = recycledProcessors.poll(); + } if (processor == null) { processor = createProcessor(); } - processor.action(ActionCode.ACTION_START, null); - processor.process(socket.getSocket()); - return SocketState.CLOSED; - + SocketState state = socket.isAsync()?processor.asyncDispatch(status):processor.process(socket); + if (state == SocketState.LONG) { + connections.put(socket, processor); + } else { + connections.remove(socket); + } + return state; } catch(java.net.SocketException e) { // SocketExceptions are normal AjpProtocol.log.debug --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org