Author: fhanik
Date: Mon Jul 20 01:20:02 2009
New Revision: 795669

URL: http://svn.apache.org/viewvc?rev=795669&view=rev
Log:
start implementing concurrency around the state machine
startAsync and complete() are to be thread safe operations

Modified:
    tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java
    tomcat/trunk/java/org/apache/catalina/connector/Request.java

Modified: tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java?rev=795669&r1=795668&r2=795669&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java Mon 
Jul 20 01:20:02 2009
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.AsyncListener;
@@ -30,6 +31,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.catalina.Context;
 import org.apache.coyote.ActionCode;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -39,15 +41,20 @@
  *
  */
 public class AsyncContextImpl implements AsyncContext {
+    
+    public static enum AsyncState {
+        NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING
+    };
+    
     protected static Log log = LogFactory.getLog(AsyncContextImpl.class);
     
-    private boolean started = false;
     private ServletRequest servletRequest = null;
     private ServletResponse servletResponse = null;
     private List<AsyncListenerWrapper> listeners = new 
ArrayList<AsyncListenerWrapper>();
     private boolean hasOriginalRequestAndResponse = true;
-    private boolean completed = true;
     private volatile Runnable dispatch = null;
+    private Context context = null;
+    private AtomicReference<AsyncState> state = new 
AtomicReference<AsyncState>();
     
     private Request request;
     
@@ -77,31 +84,35 @@
     @Override
     public void dispatch(ServletContext context, String path) {
         // TODO SERVLET3 - async
-     // TODO SERVLET3 - async
-        if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
-            request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI());
-            request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
-            request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
-            request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
-        }
-        final RequestDispatcher requestDispatcher = 
context.getRequestDispatcher(path);
-        final HttpServletRequest servletRequest = 
(HttpServletRequest)getRequest();
-        final HttpServletResponse servletResponse = 
(HttpServletResponse)getResponse();
-        Runnable run = new Runnable() {
-            public void run() {
-                try {
-                    //piggy back on the request dispatcher to ensure that 
filters etc get called.
-                    //TODO SERVLET3 - async should this be include/forward or 
a new dispatch type
-                    //javadoc suggests include with the type of 
DispatcherType.ASYNC
-                    requestDispatcher.include(servletRequest, servletResponse);
-                }catch (Exception x) {
-                    //log.error("Async.dispatch",x);
-                    throw new RuntimeException(x);
-                }
+        if (this.state.compareAndSet(AsyncState.STARTED, 
AsyncState.DISPATCHING)) {
+            if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+                request.setAttribute(ASYNC_REQUEST_URI, 
request.getRequestURI());
+                request.setAttribute(ASYNC_CONTEXT_PATH, 
request.getContextPath());
+                request.setAttribute(ASYNC_SERVLET_PATH, 
request.getServletPath());
+                request.setAttribute(ASYNC_QUERY_STRING, 
request.getQueryString());
             }
-        };
-        this.dispatch = run;
-        request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, null );
+            final RequestDispatcher requestDispatcher = 
context.getRequestDispatcher(path);
+            final HttpServletRequest servletRequest = 
(HttpServletRequest)getRequest();
+            final HttpServletResponse servletResponse = 
(HttpServletResponse)getResponse();
+            Runnable run = new Runnable() {
+                public void run() {
+                    try {
+                        //piggy back on the request dispatcher to ensure that 
filters etc get called.
+                        //TODO SERVLET3 - async should this be include/forward 
or a new dispatch type
+                        //javadoc suggests include with the type of 
DispatcherType.ASYNC
+                        requestDispatcher.include(servletRequest, 
servletResponse);
+                    }catch (Exception x) {
+                        //log.error("Async.dispatch",x);
+                        throw new RuntimeException(x);
+                    }
+                }
+            };
+            this.dispatch = run;
+            request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, 
null );
+
+        } else {
+            throw new IllegalStateException("Dispatch not allowed. Invalid 
state:"+state.get());
+        }
     }
 
     @Override
@@ -139,20 +150,24 @@
     
     
     protected void recycle() {
-        started = false;
         servletRequest = null;
         servletResponse = null;
         listeners.clear();
         hasOriginalRequestAndResponse = true;
-        completed = true;
+        state.set(AsyncState.NOT_STARTED);
+        context = null;
     }
 
     public boolean isStarted() {
-        return started;
+        return (state.get()!=AsyncState.NOT_STARTED);
     }
 
-    public void setStarted(boolean started) {
-        this.started = started;
+    public void setStarted(Context context) {
+        if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED)) {
+            this.context = context;
+        } else {
+            throw new IllegalStateException("Already started.");
+        }
     }
 
     public ServletRequest getServletRequest() {
@@ -181,47 +196,59 @@
     }
 
     public boolean isCompleted() {
-        return completed;
+        return (state.get()==AsyncState.NOT_STARTED);
     }
 
-    public void setCompleted(boolean completed) {
-        this.completed = completed;
+    public void setCompleted() {
+        this.state.set(AsyncState.NOT_STARTED);
     }
     
     public void doInternalDispatch() throws ServletException, IOException {
-        if (this.dispatch!=null) {
-            try {
-                dispatch.run();
-            } catch (RuntimeException x) {
-                doInternalComplete(true);
-                if (x.getCause() instanceof ServletException) throw 
(ServletException)x.getCause();
-                if (x.getCause() instanceof IOException) throw 
(IOException)x.getCause();
-                else throw new ServletException(x);
-            } finally {
-                dispatch = null;
+        if (this.state.compareAndSet(AsyncState.DISPATCHING, 
AsyncState.DISPATCHED)) {
+            if (this.dispatch!=null) {
+                try {
+                    dispatch.run();
+                } catch (RuntimeException x) {
+                    doInternalComplete(true);
+                    if (x.getCause() instanceof ServletException) throw 
(ServletException)x.getCause();
+                    if (x.getCause() instanceof IOException) throw 
(IOException)x.getCause();
+                    else throw new ServletException(x);
+                } finally {
+                    dispatch = null;
+                }
             }
+        } else {
+            throw new IllegalStateException("Dispatch illegal. Invalid state: 
"+state.get());
         }
     }
     
     public void doInternalComplete(boolean error) {
         if (isCompleted()) return;
-        for (AsyncListenerWrapper wrapper : listeners) {
+        if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) {
+            //this is the same as
+            //request.startAsync().complete();
+            recycle();
+        } else if (state.compareAndSet(AsyncState.DISPATCHED, 
AsyncState.NOT_STARTED)) {
+            for (AsyncListenerWrapper wrapper : listeners) {
+                try {
+                    wrapper.fireOnComplete();
+                }catch (IOException x) {
+                    //how does this propagate, or should it?
+                    //TODO SERVLET3 - async 
+                    log.error("",x);
+                }
+            }
             try {
-                wrapper.fireOnComplete();
-            }catch (IOException x) {
-                //how does this propagate, or should it?
-               //TODO SERVLET3 - async 
+                if (!error) getResponse().flushBuffer();
+
+            }catch (Exception x) {
                 log.error("",x);
             }
+            
request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null);
+            recycle();
+        } else { 
+            throw new IllegalStateException("Complete illegal. Invalid 
state:"+state.get());
         }
-        try {
-            if (!error) getResponse().flushBuffer();
-            
-        }catch (Exception x) {
-            log.error("",x);
-        }
-        request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null);
-        recycle();
     }
 
 }

Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=795669&r1=795668&r2=795669&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Mon Jul 20 
01:20:02 2009
@@ -1466,8 +1466,7 @@
         else if (asyncContext.isStarted()) throw new 
IllegalStateException("Already started.");
         asyncContext.setServletRequest(getRequest());
         asyncContext.setServletResponse(response.getResponse());
-        asyncContext.setStarted(true);
-        asyncContext.setCompleted(false);
+        asyncContext.setStarted(getContext());
         return asyncContext;
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to