Author: markt Date: Thu Sep 30 00:28:59 2010 New Revision: 1002911 URL: http://svn.apache.org/viewvc?rev=1002911&view=rev Log: Get AJP working after async refactoring. Some issues remain with AJP/APR connector which still has a handful of TCK failures.
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1002911&r1=1002910&r2=1002911&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep 30 00:28:59 2010 @@ -22,11 +22,16 @@ import java.io.IOException; import java.net.InetAddress; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.catalina.core.AsyncContextImpl; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; import org.apache.coyote.Adapter; +import org.apache.coyote.AsyncStateMachine; import org.apache.coyote.InputBuffer; +import org.apache.coyote.Processor; import org.apache.coyote.Request; import org.apache.coyote.Response; import org.apache.juli.logging.Log; @@ -36,12 +41,13 @@ import org.apache.tomcat.util.buf.Messag import org.apache.tomcat.util.http.HttpMessages; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AbstractEndpoint; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; /** * Base class for AJP Processor implementations. */ -public abstract class AbstractAjpProcessor implements ActionHook { +public abstract class AbstractAjpProcessor implements ActionHook, Processor { protected abstract Log getLog(); @@ -55,12 +61,6 @@ public abstract class AbstractAjpProcess /** - * Async used - */ - protected boolean async = false; - - - /** * Associated adapter. */ protected Adapter adapter = null; @@ -170,6 +170,12 @@ public abstract class AbstractAjpProcess protected boolean finished = false; + /** + * Track changes in state for async requests. + */ + protected AsyncStateMachine asyncStateMachine = new AsyncStateMachine(this); + + // ------------------------------------------------------------- Properties @@ -248,7 +254,6 @@ public abstract class AbstractAjpProcess } else if (actionCode == ActionCode.CLOSE) { // Close - async = false; // End the processing of the current request, and stop any further // transactions with the client @@ -320,6 +325,25 @@ public abstract class AbstractAjpProcess empty = false; replay = true; + } else if (actionCode == ActionCode.ASYNC_START) { + asyncStateMachine.asyncStart((AsyncContextImpl) param); + } else if (actionCode == ActionCode.ASYNC_DISPATCHED) { + asyncStateMachine.asyncDispatched(); + } else if (actionCode == ActionCode.ASYNC_TIMEOUT) { + AtomicBoolean result = (AtomicBoolean) param; + result.set(asyncStateMachine.asyncTimeout()); + } else if (actionCode == ActionCode.ASYNC_RUN) { + asyncStateMachine.asyncRun((Runnable) param); + } else if (actionCode == ActionCode.ASYNC_ERROR) { + asyncStateMachine.asyncError(); + } else if (actionCode == ActionCode.ASYNC_IS_STARTED) { + ((AtomicBoolean) param).set(asyncStateMachine.isAsyncStarted()); + } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) { + ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching()); + } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) { + ((AtomicBoolean) param).set(asyncStateMachine.isAsync()); + } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) { + ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut()); } else { actionInternal(actionCode, param); } @@ -331,6 +355,24 @@ public abstract class AbstractAjpProcess protected abstract void finish() throws IOException; + @Override + public abstract Executor getExecutor(); + + + public void recycle() { + asyncStateMachine.recycle(); + + // Recycle Request object + first = true; + endOfStream = false; + empty = true; + replay = false; + finished = false; + request.recycle(); + response.recycle(); + certificates.recycle(); + } + // ------------------------------------------------------ Connector Methods @@ -735,6 +777,14 @@ public abstract class AbstractAjpProcess throws IOException; + protected boolean isAsync() { + return asyncStateMachine.isAsync(); + } + + protected SocketState asyncPostProcess() { + return asyncStateMachine.asyncPostProcess(); + } + // ------------------------------------- InputStreamInputBuffer Inner Class Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java?rev=1002911&r1=1002910&r2=1002911&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java Thu Sep 30 00:28:59 2010 @@ -20,7 +20,7 @@ package org.apache.coyote.ajp; import java.io.IOException; import java.io.InterruptedIOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Executor; import org.apache.coyote.ActionCode; import org.apache.coyote.OutputBuffer; @@ -204,7 +204,7 @@ public class AjpAprProcessor extends Abs * * @throws IOException error during an I/O operation */ - public boolean process(SocketWrapper<Long> socket) + public SocketState process(SocketWrapper<Long> socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); @@ -217,7 +217,6 @@ public class AjpAprProcessor extends Abs // Error flag error = false; - async = false; boolean openSocket = true; boolean keptAlive = false; @@ -295,7 +294,7 @@ public class AjpAprProcessor extends Abs } } - if (async && !error) { + if (isAsync() && !error) { break; } @@ -323,17 +322,24 @@ public class AjpAprProcessor extends Abs // Add the socket to the poller if (!error && !endpoint.isPaused()) { - ((AprEndpoint)endpoint).getPoller().add(socketRef); + if (!isAsync()) { + ((AprEndpoint)endpoint).getPoller().add(socketRef); + } } else { openSocket = false; } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - if (!async || error || endpoint.isPaused()) - recycle(); - - return openSocket; - + + if (error || endpoint.isPaused()) { + recycle(); + return SocketState.CLOSED; + } else if (isAsync()) { + return SocketState.LONG; + } else { + recycle(); + return (openSocket) ? SocketState.OPEN : SocketState.CLOSED; + } } @@ -360,9 +366,11 @@ public class AjpAprProcessor extends Abs rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - if (async) { + if (error) { + response.setStatus(500); + } + if (isAsync()) { if (error) { - response.setStatus(500); request.updateCounters(); recycle(); return SocketState.CLOSED; @@ -370,16 +378,24 @@ public class AjpAprProcessor extends Abs return SocketState.LONG; } } else { - if (error) { - response.setStatus(500); - } request.updateCounters(); recycle(); - return SocketState.CLOSED; + if (error) { + return SocketState.CLOSED; + } else { + return SocketState.OPEN; + } } } + + @Override + public Executor getExecutor() { + return endpoint.getExecutor(); + } + + // ----------------------------------------------------- ActionHook Methods @@ -392,32 +408,19 @@ public class AjpAprProcessor extends Abs @Override protected void actionInternal(ActionCode actionCode, Object param) { - long socketRef = socket.getSocket().longValue(); - - if (actionCode == ActionCode.ASYNC_START) { - async = true; - } else if (actionCode == ActionCode.ASYNC_COMPLETE) { - AtomicBoolean dispatch = (AtomicBoolean)param; - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling - dispatch.set(true); - ((AprEndpoint)endpoint).getHandler().asyncDispatch(this.socket, SocketStatus.STOP); - } else { - dispatch.set(false); - } + if (actionCode == ActionCode.ASYNC_COMPLETE) { + if (asyncStateMachine.asyncComplete()) { + ((AprEndpoint)endpoint).processSocketAsync(this.socket, + SocketStatus.OPEN); + } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { if (param==null) return; - if (socketRef==0) return; long timeout = ((Long)param).longValue(); - Socket.timeoutSet(socketRef, timeout * 1000); + socket.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { - RequestInfo rp = request.getRequestProcessor(); - AtomicBoolean dispatch = (AtomicBoolean)param; - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling - ((AprEndpoint)endpoint).getPoller().add(socketRef); - dispatch.set(true); - } else { - dispatch.set(true); + if (asyncStateMachine.asyncDispatch()) { + ((AprEndpoint)endpoint).processSocketAsync(this.socket, + SocketStatus.OPEN); } } @@ -637,17 +640,9 @@ public class AjpAprProcessor extends Abs /** * Recycle the processor. */ + @Override public void recycle() { - - // Recycle Request object - first = true; - endOfStream = false; - empty = true; - replay = false; - finished = false; - request.recycle(); - response.recycle(); - certificates.recycle(); + super.recycle(); inputBuffer.clear(); inputBuffer.limit(0); @@ -693,6 +688,7 @@ public class AjpAprProcessor extends Abs /** * Write chunk. */ + @Override public int doWrite(ByteChunk chunk, Response res) throws IOException { Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java?rev=1002911&r1=1002910&r2=1002911&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java Thu Sep 30 00:28:59 2010 @@ -119,6 +119,7 @@ public class AjpAprProtocol /** * Pass config info */ + @Override public void setAttribute(String name, Object value) { if (log.isTraceEnabled()) { log.trace(sm.getString("ajpprotocol.setattribute", name, value)); @@ -126,6 +127,7 @@ public class AjpAprProtocol attributes.put(name, value); } + @Override public Object getAttribute(String key) { if (log.isTraceEnabled()) { log.trace(sm.getString("ajpprotocol.getattribute", key)); @@ -134,6 +136,7 @@ public class AjpAprProtocol } + @Override public Iterator<String> getAttributeNames() { return attributes.keySet().iterator(); } @@ -142,11 +145,13 @@ public class AjpAprProtocol /** * The adapter, used to call the connector */ + @Override public void setAdapter(Adapter adapter) { this.adapter = adapter; } + @Override public Adapter getAdapter() { return adapter; } @@ -154,6 +159,7 @@ public class AjpAprProtocol /** Start the protocol */ + @Override public void init() throws Exception { endpoint.setName(getName()); endpoint.setHandler(cHandler); @@ -171,6 +177,7 @@ public class AjpAprProtocol } + @Override public void start() throws Exception { if (this.domain != null ) { try { @@ -197,6 +204,7 @@ public class AjpAprProtocol log.info(sm.getString("ajpprotocol.start", getName())); } + @Override public void pause() throws Exception { try { endpoint.pause(); @@ -208,6 +216,7 @@ public class AjpAprProtocol log.info(sm.getString("ajpprotocol.pause", getName())); } + @Override public void resume() throws Exception { try { endpoint.resume(); @@ -219,6 +228,7 @@ public class AjpAprProtocol log.info(sm.getString("ajpprotocol.resume", getName())); } + @Override public void stop() throws Exception { try { endpoint.stop(); @@ -230,6 +240,7 @@ public class AjpAprProtocol log.info(sm.getString("ajpprotocol.stop", getName())); } + @Override public void destroy() throws Exception { if (log.isInfoEnabled()) log.info(sm.getString("ajpprotocol.destroy", getName())); @@ -259,6 +270,7 @@ public class AjpAprProtocol public int getProcessorCache() { return this.processorCache; } public void setProcessorCache(int processorCache) { this.processorCache = processorCache; } + @Override public Executor getExecutor() { return endpoint.getExecutor(); } public void setExecutor(Executor executor) { endpoint.setExecutor(executor); } @@ -321,7 +333,9 @@ public class AjpAprProtocol public void setKeepAliveTimeout(int timeout) { endpoint.setKeepAliveTimeout(timeout); } public boolean getUseSendfile() { return endpoint.getUseSendfile(); } - public void setUseSendfile(boolean useSendfile) { /* No sendfile for AJP */ } + public void setUseSendfile(@SuppressWarnings("unused") boolean useSendfile) { + /* No sendfile for AJP */ + } public int getPollTime() { return endpoint.getPollTime(); } public void setPollTime(int pollTime) { endpoint.setPollTime(pollTime); } @@ -343,6 +357,7 @@ public class AjpAprProtocol protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors = new ConcurrentLinkedQueue<AjpAprProcessor>() { + private static final long serialVersionUID = 1L; protected AtomicInteger size = new AtomicInteger(0); @Override public boolean offer(AjpAprProcessor processor) { @@ -385,36 +400,44 @@ public class AjpAprProtocol } // FIXME: Support for this could be added in AJP as well + @Override public SocketState event(SocketWrapper<Long> socket, SocketStatus status) { return SocketState.CLOSED; } + @Override public SocketState process(SocketWrapper<Long> socket) { AjpAprProcessor processor = recycledProcessors.poll(); try { - if (processor == null) { processor = createProcessor(); } - if (processor.process(socket)) { + SocketState state = processor.process(socket); + if (state == SocketState.LONG) { + // Check if the post processing is going to change the state + state = processor.asyncPostProcess(); + } + if (state == SocketState.LONG || state == SocketState.ASYNC_END) { + // Need to make socket available for next processing cycle + // but no need for the poller connections.put(socket, processor); - return SocketState.OPEN; } else { - // recycledProcessors.offer(processor); - return SocketState.CLOSED; + if (state == SocketState.OPEN) { + connections.put(socket, processor); + } + recycledProcessors.offer(processor); } + return state; } catch(java.net.SocketException e) { // SocketExceptions are normal - AjpAprProtocol.log.debug - (sm.getString - ("ajpprotocol.proto.socketexception.debug"), e); + log.debug(sm.getString( + "ajpprotocol.proto.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal - AjpAprProtocol.log.debug - (sm.getString - ("ajpprotocol.proto.ioexception.debug"), e); + log.debug(sm.getString( + "ajpprotocol.proto.ioexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as @@ -424,15 +447,14 @@ public class AjpAprProtocol // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. - AjpAprProtocol.log.error - (sm.getString("ajpprotocol.proto.error"), e); - } finally { - recycledProcessors.offer(processor); + log.error(sm.getString("ajpprotocol.proto.error"), e); } + recycledProcessors.offer(processor); return SocketState.CLOSED; } // FIXME: Support for this could be added in AJP as well + @Override public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) { AjpAprProcessor result = connections.get(socket); @@ -454,7 +476,10 @@ public class AjpAprProtocol AjpAprProtocol.log.error (sm.getString("ajpprotocol.proto.error"), e); } finally { - if (state != SocketState.LONG) { + if (state == SocketState.LONG && result.isAsync()) { + state = result.asyncPostProcess(); + } + if (state != SocketState.LONG && state != SocketState.ASYNC_END) { connections.remove(socket); recycledProcessors.offer(result); if (state == SocketState.OPEN) { @@ -534,6 +559,7 @@ public class AjpAprProtocol return domain; } + @Override public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { oname=name; @@ -542,14 +568,18 @@ public class AjpAprProtocol return name; } + @Override public void postRegister(Boolean registrationDone) { + // NOOP } + @Override public void preDeregister() throws Exception { + // NOOP } + @Override public void postDeregister() { + // NOOP } - - } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1002911&r1=1002910&r2=1002911&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Sep 30 00:28:59 2010 @@ -22,7 +22,7 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.Socket; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Executor; import org.apache.coyote.ActionCode; import org.apache.coyote.OutputBuffer; @@ -304,7 +304,7 @@ public class AjpProcessor extends Abstra } } - if (async && !error) { + if (isAsync() && !error) { break; } @@ -329,11 +329,12 @@ public class AjpProcessor extends Abstra recycle(); } - if (async && !error && !endpoint.isPaused()) { - rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (isAsync() && !error && !endpoint.isPaused()) { return SocketState.LONG; } else { - rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); recycle(); input = null; output = null; @@ -361,7 +362,7 @@ public class AjpProcessor extends Abstra rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - if (async) { + if (isAsync()) { if (error) { response.setStatus(500); request.updateCounters(); @@ -386,6 +387,13 @@ public class AjpProcessor extends Abstra } + + @Override + public Executor getExecutor() { + return endpoint.getExecutor(); + } + + // ----------------------------------------------------- ActionHook Methods @@ -398,37 +406,22 @@ public class AjpProcessor extends Abstra @Override protected void actionInternal(ActionCode actionCode, Object param) { - if (actionCode == ActionCode.ASYNC_START) { - //TODO SERVLET3 - async - async = true; - } else if (actionCode == ActionCode.ASYNC_COMPLETE) { - //TODO SERVLET3 - async - AtomicBoolean dispatch = (AtomicBoolean)param; - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling - dispatch.set(true); - ((JIoEndpoint)endpoint).processSocketAsync(this.socket, SocketStatus.OPEN); - } else { - dispatch.set(false); + if (actionCode == ActionCode.ASYNC_COMPLETE) { + if (asyncStateMachine.asyncComplete()) { + ((JIoEndpoint)endpoint).processSocketAsync(this.socket, + SocketStatus.OPEN); } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { - //TODO SERVLET3 - async - if (param==null) return; + if (param == null) return; long timeout = ((Long)param).longValue(); - //if we are not piggy backing on a worker thread, set the timeout + // if we are not piggy backing on a worker thread, set the timeout socket.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { - RequestInfo rp = request.getRequestProcessor(); - AtomicBoolean dispatch = (AtomicBoolean)param; - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling - ((JIoEndpoint)endpoint).processSocketAsync(this.socket, SocketStatus.OPEN); - dispatch.set(true); - } else { - dispatch.set(true); + if (asyncStateMachine.asyncDispatch()) { + ((JIoEndpoint)endpoint).processSocketAsync(this.socket, + SocketStatus.OPEN); } } - - } @@ -581,25 +574,6 @@ public class AjpProcessor extends Abstra /** - * Recycle the processor. - */ - public void recycle() { - - // Recycle Request object - first = true; - endOfStream = false; - empty = true; - replay = false; - finished = false; - request.recycle(); - response.recycle(); - certificates.recycle(); - async = false; - - } - - - /** * Callback to write data from the buffer. */ @Override @@ -623,6 +597,7 @@ public class AjpProcessor extends Abstra /** * Write chunk. */ + @Override public int doWrite(ByteChunk chunk, Response res) throws IOException { Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java?rev=1002911&r1=1002910&r2=1002911&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProtocol.java Thu Sep 30 00:28:59 2010 @@ -121,6 +121,7 @@ public class AjpProtocol /** * Pass config info */ + @Override public void setAttribute(String name, Object value) { if (log.isTraceEnabled()) { log.trace(sm.getString("ajpprotocol.setattribute", name, value)); @@ -128,6 +129,7 @@ public class AjpProtocol attributes.put(name, value); } + @Override public Object getAttribute(String key) { if (log.isTraceEnabled()) { log.trace(sm.getString("ajpprotocol.getattribute", key)); @@ -136,6 +138,7 @@ public class AjpProtocol } + @Override public Iterator<String> getAttributeNames() { return attributes.keySet().iterator(); } @@ -144,11 +147,13 @@ public class AjpProtocol /** * The adapter, used to call the connector */ + @Override public void setAdapter(Adapter adapter) { this.adapter = adapter; } + @Override public Adapter getAdapter() { return adapter; } @@ -156,6 +161,7 @@ public class AjpProtocol /** Start the protocol */ + @Override public void init() throws Exception { endpoint.setName(getName()); endpoint.setHandler(cHandler); @@ -172,6 +178,7 @@ public class AjpProtocol } + @Override public void start() throws Exception { if (this.domain != null ) { try { @@ -198,6 +205,7 @@ public class AjpProtocol log.info(sm.getString("ajpprotocol.start", getName())); } + @Override public void pause() throws Exception { try { endpoint.pause(); @@ -209,6 +217,7 @@ public class AjpProtocol log.info(sm.getString("ajpprotocol.pause", getName())); } + @Override public void resume() throws Exception { try { endpoint.resume(); @@ -220,6 +229,7 @@ public class AjpProtocol log.info(sm.getString("ajpprotocol.resume", getName())); } + @Override public void stop() throws Exception { try { endpoint.stop(); @@ -231,6 +241,7 @@ public class AjpProtocol log.info(sm.getString("ajpprotocol.stop", getName())); } + @Override public void destroy() throws Exception { if (log.isInfoEnabled()) log.info(sm.getString("ajpprotocol.destroy", getName())); @@ -260,6 +271,7 @@ public class AjpProtocol public int getProcessorCache() { return this.processorCache; } public void setProcessorCache(int processorCache) { this.processorCache = processorCache; } + @Override public Executor getExecutor() { return endpoint.getExecutor(); } public void setExecutor(Executor executor) { endpoint.setExecutor(executor); } @@ -337,6 +349,7 @@ public class AjpProtocol protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors = new ConcurrentLinkedQueue<AjpProcessor>() { + private static final long serialVersionUID = 1L; protected AtomicInteger size = new AtomicInteger(0); @Override public boolean offer(AjpProcessor processor) { @@ -378,10 +391,12 @@ public class AjpProtocol this.proto = proto; } + @Override public SocketState process(SocketWrapper<Socket> socket) { return process(socket,SocketStatus.OPEN); } + @Override public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) { AjpProcessor processor = connections.remove(socket); try { @@ -396,21 +411,23 @@ public class AjpProtocol if (state == SocketState.LONG) { connections.put(socket, processor); socket.setAsync(true); + // longPoll may change socket state (e.g. to trigger a + // complete or dispatch) + return processor.asyncPostProcess(); } else { connections.remove(socket); socket.setAsync(false); + recycledProcessors.offer(processor); } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal - AjpProtocol.log.debug - (sm.getString - ("ajpprotocol.proto.socketexception.debug"), e); + log.debug(sm.getString( + "ajpprotocol.proto.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal - AjpProtocol.log.debug - (sm.getString - ("ajpprotocol.proto.ioexception.debug"), e); + log.debug(sm.getString( + "ajpprotocol.proto.ioexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as @@ -420,11 +437,9 @@ public class AjpProtocol // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. - AjpProtocol.log.error - (sm.getString("ajpprotocol.proto.error"), e); - } finally { - recycledProcessors.offer(processor); + log.error(sm.getString("ajpprotocol.proto.error"), e); } + recycledProcessors.offer(processor); return SocketState.CLOSED; } @@ -497,6 +512,7 @@ public class AjpProtocol return domain; } + @Override public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { oname=name; @@ -505,14 +521,18 @@ public class AjpProtocol return name; } + @Override public void postRegister(Boolean registrationDone) { + // NOOP } + @Override public void preDeregister() throws Exception { + // NOOP } + @Override public void postDeregister() { + // NOOP } - - } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org