Author: fhanik
Date: Thu Mar 25 16:41:05 2010
New Revision: 927490

URL: http://svn.apache.org/viewvc?rev=927490&view=rev
Log:
Enable async behavior for the AJP connector

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=927490&r1=927489&r2=927490&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Mar 25 
16:41:05 2010
@@ -26,6 +26,7 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.ActionHook;
@@ -44,6 +45,9 @@ import org.apache.tomcat.util.http.HttpM
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -181,7 +185,7 @@ public class AjpProcessor implements Act
     /**
      * Socket associated with the current connection.
      */
-    protected Socket socket;
+    protected SocketWrapper<Socket> socket;
 
     
     /**
@@ -271,6 +275,11 @@ public class AjpProcessor implements Act
      * Flush message array.
      */
     protected static final byte[] flushMessageArray;
+    
+    /**
+     * Async used
+     */
+    protected boolean async = false;
 
 
     // ----------------------------------------------------- Static Initializer
@@ -357,18 +366,18 @@ public class AjpProcessor implements Act
      *
      * @throws IOException error during an I/O operation
      */
-    public void process(Socket socket)
+    public SocketState process(SocketWrapper<Socket> socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
 
         // Setting up the socket
         this.socket = socket;
-        input = socket.getInputStream();
-        output = socket.getOutputStream();
+        input = socket.getSocket().getInputStream();
+        output = socket.getSocket().getOutputStream();
         int soTimeout = -1;
         if (keepAliveTimeout > 0) {
-            soTimeout = socket.getSoTimeout();
+            soTimeout = socket.getSocket().getSoTimeout();
         }
 
         // Error flag
@@ -380,7 +389,7 @@ public class AjpProcessor implements Act
             try {
                 // Set keep alive timeout if enabled
                 if (keepAliveTimeout > 0) {
-                    socket.setSoTimeout(keepAliveTimeout);
+                    socket.getSocket().setSoTimeout(keepAliveTimeout);
                 }
                 // Get first message of the request
                 if (!readMessage(requestHeaderMessage)) {
@@ -390,7 +399,7 @@ public class AjpProcessor implements Act
                 }
                 // Set back timeout if keep alive timeout is enabled
                 if (keepAliveTimeout > 0) {
-                    socket.setSoTimeout(soTimeout);
+                    socket.getSocket().setSoTimeout(soTimeout);
                 }
                 // Check message type, process right away and break if
                 // not regular request processing
@@ -446,6 +455,10 @@ public class AjpProcessor implements Act
                     error = true;
                 }
             }
+            
+            if (async && !error) {
+                break;
+            }
 
             // Finish the response if not done yet
             if (!finished) {
@@ -467,15 +480,66 @@ public class AjpProcessor implements Act
             recycle();
 
         }
+        if (async && !error) {
+            rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+            socket.setAsync(true);
+            return SocketState.LONG;
+        } else {
+            rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+            recycle();
+            input = null;
+            output = null;
+            return SocketState.CLOSED;
+        }
+        
+    }
+
+    public SocketState asyncDispatch(SocketStatus status) throws IOException {
+
+        RequestInfo rp = request.getRequestProcessor();
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            error = !adapter.asyncDispatch(request, response, status);
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            log.error(sm.getString("http11processor.request.process"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            error = true;
+        }
 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
-        recycle();
-        input = null;
-        output = null;
+
+        if (async) {
+            if (error) {
+                socket.setAsync(false);
+                response.setStatus(500);
+                request.updateCounters();
+                recycle();
+                input = null;
+                output = null;
+                return SocketState.CLOSED;
+            } else {
+                return SocketState.LONG;
+            }
+        } else {
+            socket.setAsync(false);
+            if (error) {
+                response.setStatus(500);
+            }
+            request.updateCounters();
+            recycle();
+            input = null;
+            output = null;
+            return SocketState.CLOSED;
+        }
+        
+        
+        
         
     }
 
-
     // ----------------------------------------------------- ActionHook Methods
 
 
@@ -522,7 +586,7 @@ public class AjpProcessor implements Act
 
         } else if (actionCode == ActionCode.ACTION_CLOSE) {
             // Close
-
+            async = false;
             // End the processing of the current request, and stop any further
             // transactions with the client
 
@@ -602,6 +666,34 @@ public class AjpProcessor implements Act
             empty = false;
             replay = true;
 
+        }  else if (actionCode == ActionCode.ACTION_ASYNC_START) {
+            //TODO SERVLET3 - async
+            async = true;
+        } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) {
+          //TODO SERVLET3 - async
+            AtomicBoolean dispatch = (AtomicBoolean)param;
+            RequestInfo rp = request.getRequestProcessor();
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) 
{ //async handling
+                dispatch.set(true);
+                endpoint.processSocket(this.socket, SocketStatus.STOP);
+            } else {
+                dispatch.set(true);
+            }
+        } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) {
+          //TODO SERVLET3 - async
+            if (param==null) return;
+            long timeout = ((Long)param).longValue();
+            //if we are not piggy backing on a worker thread, set the timeout
+            socket.setTimeout(timeout);
+        } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) {
+            RequestInfo rp = request.getRequestProcessor();
+            AtomicBoolean dispatch = (AtomicBoolean)param;
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) 
{//async handling
+                endpoint.processSocket(this.socket, SocketStatus.OPEN);
+                dispatch.set(true);
+            } else { 
+                dispatch.set(true);
+            }
         }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java?rev=927490&r1=927489&r2=927490&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Thu Mar 25 
16:41:05 2010
@@ -22,6 +22,7 @@ import java.net.Socket;
 import java.net.URLEncoder;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -320,6 +321,8 @@ public class AjpProtocol 
         protected AjpProtocol proto;
         protected AtomicLong registerCount = new AtomicLong(0);
         protected RequestGroupInfo global = new RequestGroupInfo();
+        protected ConcurrentHashMap<SocketWrapper, AjpProcessor> connections =
+            new ConcurrentHashMap<SocketWrapper, AjpProcessor>();
 
         protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors = 
             new ConcurrentLinkedQueue<AjpProcessor>() {
@@ -364,23 +367,28 @@ public class AjpProtocol 
             this.proto = proto;
         }
         
-        public SocketState process(SocketWrapper<Socket> socket, SocketStatus 
status) {
-            throw new UnsupportedOperationException();
+        public SocketState process(SocketWrapper<Socket> socket) {
+            return process(socket,SocketStatus.OPEN);
         }
 
-        public SocketState process(SocketWrapper<Socket> socket) {
-            AjpProcessor processor = recycledProcessors.poll();
+        public SocketState process(SocketWrapper<Socket> socket, SocketStatus 
status) {
+            AjpProcessor processor = connections.remove(socket);
             try {
-
+                if (processor == null) {
+                    processor = recycledProcessors.poll();
+                }
                 if (processor == null) {
                     processor = createProcessor();
                 }
-
                 processor.action(ActionCode.ACTION_START, null);
 
-                processor.process(socket.getSocket());
-                return SocketState.CLOSED;
-
+                SocketState state = 
socket.isAsync()?processor.asyncDispatch(status):processor.process(socket);
+                if (state == SocketState.LONG) {
+                    connections.put(socket, processor);
+                } else {
+                    connections.remove(socket);
+                }
+                return state;
             } catch(java.net.SocketException e) {
                 // SocketExceptions are normal
                 AjpProtocol.log.debug



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to