Author: fhanik Date: Mon Jul 20 01:20:02 2009 New Revision: 795669 URL: http://svn.apache.org/viewvc?rev=795669&view=rev Log: start implementing concurrency around the state machine startAsync and complete() are to be thread safe operations
Modified: tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java tomcat/trunk/java/org/apache/catalina/connector/Request.java Modified: tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java?rev=795669&r1=795668&r2=795669&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/AsyncContextImpl.java Mon Jul 20 01:20:02 2009 @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; import javax.servlet.AsyncListener; @@ -30,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.catalina.Context; import org.apache.coyote.ActionCode; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -39,15 +41,20 @@ * */ public class AsyncContextImpl implements AsyncContext { + + public static enum AsyncState { + NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING + }; + protected static Log log = LogFactory.getLog(AsyncContextImpl.class); - private boolean started = false; private ServletRequest servletRequest = null; private ServletResponse servletResponse = null; private List<AsyncListenerWrapper> listeners = new ArrayList<AsyncListenerWrapper>(); private boolean hasOriginalRequestAndResponse = true; - private boolean completed = true; private volatile Runnable dispatch = null; + private Context context = null; + private AtomicReference<AsyncState> state = new AtomicReference<AsyncState>(); private Request request; @@ -77,31 +84,35 @@ @Override public void dispatch(ServletContext context, String path) { // TODO SERVLET3 - async - // TODO SERVLET3 - async - if (request.getAttribute(ASYNC_REQUEST_URI)==null) { - request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()); - request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath()); - request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath()); - request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString()); - } - final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path); - final HttpServletRequest servletRequest = (HttpServletRequest)getRequest(); - final HttpServletResponse servletResponse = (HttpServletResponse)getResponse(); - Runnable run = new Runnable() { - public void run() { - try { - //piggy back on the request dispatcher to ensure that filters etc get called. - //TODO SERVLET3 - async should this be include/forward or a new dispatch type - //javadoc suggests include with the type of DispatcherType.ASYNC - requestDispatcher.include(servletRequest, servletResponse); - }catch (Exception x) { - //log.error("Async.dispatch",x); - throw new RuntimeException(x); - } + if (this.state.compareAndSet(AsyncState.STARTED, AsyncState.DISPATCHING)) { + if (request.getAttribute(ASYNC_REQUEST_URI)==null) { + request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()); + request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath()); + request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath()); + request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString()); } - }; - this.dispatch = run; - request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, null ); + final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path); + final HttpServletRequest servletRequest = (HttpServletRequest)getRequest(); + final HttpServletResponse servletResponse = (HttpServletResponse)getResponse(); + Runnable run = new Runnable() { + public void run() { + try { + //piggy back on the request dispatcher to ensure that filters etc get called. + //TODO SERVLET3 - async should this be include/forward or a new dispatch type + //javadoc suggests include with the type of DispatcherType.ASYNC + requestDispatcher.include(servletRequest, servletResponse); + }catch (Exception x) { + //log.error("Async.dispatch",x); + throw new RuntimeException(x); + } + } + }; + this.dispatch = run; + request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, null ); + + } else { + throw new IllegalStateException("Dispatch not allowed. Invalid state:"+state.get()); + } } @Override @@ -139,20 +150,24 @@ protected void recycle() { - started = false; servletRequest = null; servletResponse = null; listeners.clear(); hasOriginalRequestAndResponse = true; - completed = true; + state.set(AsyncState.NOT_STARTED); + context = null; } public boolean isStarted() { - return started; + return (state.get()!=AsyncState.NOT_STARTED); } - public void setStarted(boolean started) { - this.started = started; + public void setStarted(Context context) { + if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED)) { + this.context = context; + } else { + throw new IllegalStateException("Already started."); + } } public ServletRequest getServletRequest() { @@ -181,47 +196,59 @@ } public boolean isCompleted() { - return completed; + return (state.get()==AsyncState.NOT_STARTED); } - public void setCompleted(boolean completed) { - this.completed = completed; + public void setCompleted() { + this.state.set(AsyncState.NOT_STARTED); } public void doInternalDispatch() throws ServletException, IOException { - if (this.dispatch!=null) { - try { - dispatch.run(); - } catch (RuntimeException x) { - doInternalComplete(true); - if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause(); - if (x.getCause() instanceof IOException) throw (IOException)x.getCause(); - else throw new ServletException(x); - } finally { - dispatch = null; + if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) { + if (this.dispatch!=null) { + try { + dispatch.run(); + } catch (RuntimeException x) { + doInternalComplete(true); + if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause(); + if (x.getCause() instanceof IOException) throw (IOException)x.getCause(); + else throw new ServletException(x); + } finally { + dispatch = null; + } } + } else { + throw new IllegalStateException("Dispatch illegal. Invalid state: "+state.get()); } } public void doInternalComplete(boolean error) { if (isCompleted()) return; - for (AsyncListenerWrapper wrapper : listeners) { + if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) { + //this is the same as + //request.startAsync().complete(); + recycle(); + } else if (state.compareAndSet(AsyncState.DISPATCHED, AsyncState.NOT_STARTED)) { + for (AsyncListenerWrapper wrapper : listeners) { + try { + wrapper.fireOnComplete(); + }catch (IOException x) { + //how does this propagate, or should it? + //TODO SERVLET3 - async + log.error("",x); + } + } try { - wrapper.fireOnComplete(); - }catch (IOException x) { - //how does this propagate, or should it? - //TODO SERVLET3 - async + if (!error) getResponse().flushBuffer(); + + }catch (Exception x) { log.error("",x); } + request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null); + recycle(); + } else { + throw new IllegalStateException("Complete illegal. Invalid state:"+state.get()); } - try { - if (!error) getResponse().flushBuffer(); - - }catch (Exception x) { - log.error("",x); - } - request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null); - recycle(); } } Modified: tomcat/trunk/java/org/apache/catalina/connector/Request.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/Request.java?rev=795669&r1=795668&r2=795669&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/Request.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/Request.java Mon Jul 20 01:20:02 2009 @@ -1466,8 +1466,7 @@ else if (asyncContext.isStarted()) throw new IllegalStateException("Already started."); asyncContext.setServletRequest(getRequest()); asyncContext.setServletResponse(response.getResponse()); - asyncContext.setStarted(true); - asyncContext.setCompleted(false); + asyncContext.setStarted(getContext()); return asyncContext; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org