Author: fhanik Date: Mon Dec 3 16:06:24 2007 New Revision: 600737 URL: http://svn.apache.org/viewvc?rev=600737&view=rev Log: implemented buffered non blocking write for comet events
Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/sandbox/gdev6x/webapps/docs/aio.xml tomcat/sandbox/gdev6x/webapps/docs/changelog.xml tomcat/sandbox/gdev6x/webapps/docs/config/http.xml Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/catalina/CometEvent.java Mon Dec 3 16:06:24 2007 @@ -131,20 +131,22 @@ * client a notice that the server has no more data to send as part of this * request. The servlet should perform any needed cleanup as if it had recieved * an END or ERROR event. - * Invoking this method during a event, will cause the session to close + * Invoking this method during a event, will cause the Comet session to close * immediately after the event method has finished. - * Invoking this method asynchrously will not cause the session to close - * until another event occurred, most likely a timeout. - * If you wish to signal to the container - * that the session should end sooner rather than later when this method is invoked - * asycnhronously, then issue a - * register(OP_CALLBACK) immediately after this method has been invoked. + * Invoking this method asynchrously will cause the Comet session to end after the + * END event has been processed * * @see #register(int) */ public void close() throws IOException; /** + * Returns true if #close() has been invoked + * @return boolean + */ + public boolean isClosed(); + + /** * Sets the timeout for this Comet connection. Please NOTE, that the implementation * of a per connection timeout is OPTIONAL and MAY NOT be implemented.<br/> * This method sets the timeout in milliseconds of idle time on the connection. @@ -171,11 +173,14 @@ * a) Blocking IO - standard servlet usage<br/> * b) Register for READ events when data arrives<br/> * Tomcat Comet allows you to configure for additional options:<br/> - * the <code>configureBlocking(false)</code> bit signals whether writing and reading from the request - * or writing to the response will be non blocking.<br/> - * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion - * @param blocking - true to make read and writes blocking - * @throws IllegalStateException - if this method is invoked outside of the BEGIN event + * the <code>configureBlocking(false)</code> bit signals whether writing to the response will be non blocking.<br/> + * the <code>configureBlocking(true)</code> bit signals the container you wish for read and write to be done in a blocking fashion<br/> + * when parameter is set to false, writes will be buffered and dispatched to the servlet container + * to complete the write asynchronously. The size of the write buffer can be configured. + * If ServletRequest.(getInputStream/getWriter) is invoked with more data than + * it can handle, an IO Exception will be thrown. + * @param blocking - true to make writes blocking, false to make writes non blocking + * @throws IllegalStateException - if this method is invoked outside of the BEGIN event or if blocking has already been configured * @see #isReadable() * @see #isWriteable() */ @@ -188,10 +193,10 @@ public boolean isBlocking(); /** - * OP_CALLBACK - receive a CALLBACK event from the container + * OP_CALLBACK - receive a CALLBACK event from the container, on a Tomcat worker thread * OP_READ - receive a READ event when the connection has data to be read * OP_WRITE - receive a WRITE event when the connection is able to receive data to be written - * @see #register(int) + * @see #interestOps(int) */ public static class CometOperation { //currently map these to the same values as org.apache.tomcat.util.net.PollerInterest @@ -204,6 +209,7 @@ * Registers the Comet connection with the container for IO and event notifications. * Each time this method is invoked, the operations are reset to the operations parameter value. * To unregister an operation, simple do interestOps(interestOps() & (~CometOperation.OP_WRITE)) + * This method can be invoked synchronously or asynchronously (by a non Tomcat worker thread) to change the operations * @param operations * @throws IllegalStateException - if you are trying to register with a socket that already is registered * or if the operation you are trying to register is invalid. @@ -243,5 +249,5 @@ * @return boolean */ public boolean isReadable(); - + } Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/catalina/connector/CometEventImpl.java Mon Dec 3 16:06:24 2007 @@ -83,7 +83,13 @@ * Blocking or not blocking */ protected boolean blocking = true; - + + /** + * Closed? + */ + protected boolean closed = false; + + // --------------------------------------------------------- Public Methods /** @@ -94,6 +100,7 @@ response = null; blocking = true; cometOperations = 0; + closed = false; } public void setEventType(EventType eventType) { @@ -104,12 +111,20 @@ this.eventSubType = eventSubType; } + public boolean isClosed() { + return closed; + } + public void close() throws IOException { + if (!closed) closed = true; if (request == null) { throw new IllegalStateException(sm.getString("cometEvent.nullRequest")); } request.setComet(false); response.finishResponse(); + //if this is a worker thread, the comet operation will be reset + //otherwise, we are signaling to end the request + interestOps(CometEvent.CometOperation.OP_CALLBACK); } public EventSubType getEventSubType() { Modified: tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/catalina/core/StandardWrapperValve.java Mon Dec 3 16:06:24 2007 @@ -213,8 +213,8 @@ try { SystemLogHandler.startCapture(); if (comet) { - filterChain.doFilterEvent(request.getEvent()); request.setComet(true); + filterChain.doFilterEvent(request.getEvent()); } else { filterChain.doFilter(request.getRequest(), response.getResponse()); @@ -227,8 +227,8 @@ } } else { if (comet) { - filterChain.doFilterEvent(request.getEvent()); request.setComet(true); + filterChain.doFilterEvent(request.getEvent()); } else { filterChain.doFilter (request.getRequest(), response.getResponse()); Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProcessor.java Mon Dec 3 16:06:24 2007 @@ -692,6 +692,14 @@ public int getSocketBuffer() { return socketBuffer; } + + public void setBufferedWriteSize(int size) { + outputBuffer.setBufferedWriteSize(size); + } + + public int getBufferedWriteSize() { + return outputBuffer.getBufferedWriteSize(); + } /** * Set the upload timeout. @@ -744,20 +752,40 @@ public SocketState event(SocketStatus status) throws IOException { - RequestInfo rp = request.getRequestProcessor(); + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) socket.getAttachment(false); + if (status == SocketStatus.OPEN_WRITE) { + //flush out buffered write data + if (outputBuffer.hasDataToWrite()) { + int cnt = 0; + do { + cnt = outputBuffer.flushBuffer(false); + }while (cnt>0); + } + //return if we have more data to write + if (outputBuffer.hasDataToWrite()) return SocketState.LONG; + + //return if the comet processor wasn't registered for WRITE + if (attach!=null && (attach.getCometOps()&PollerInterest.WRITE)!=PollerInterest.WRITE) { + return SocketState.LONG; + } + } + RequestInfo rp = request.getRequestProcessor(); try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); error = !adapter.event(request, response, status); if ( !error ) { - NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - if (attach != null) { - attach.setComet(comet); - if (!comet) { - //reset the timeout - attach.setTimeout(endpoint.getSocketProperties().getSoTimeout()); - } + try { + + if (attach != null) { + attach.setComet(comet); + if (!comet) { + //reset the timeout + attach.setTimeout(endpoint.getSocketProperties().getSoTimeout()); + } + } + } catch (Exception ex) { } } } catch (InterruptedIOException e) { @@ -992,16 +1020,6 @@ localName = null; remotePort = -1; localPort = -1; - //fix the synchronization scenario due to - //dual comet flags. - //while the response/request - //might already be recycled, this circumvents the bug - //and should not be an expensive operation - //however, this is a TODO and FIXME - //as it would be better coordinate the recycling of the request/response - //instead - response.recycle(); - request.recycle(); } @@ -1242,10 +1260,11 @@ socket.getPoller().cometInterest(socket); } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE_BLOCKING) { MutableBoolean bool = (MutableBoolean)param; - if ( bool.get() ) throw new IllegalStateException("Not yet implemented"); RequestInfo rp = request.getRequestProcessor(); if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) throw new IllegalStateException("Can only be configured during an event."); + inputBuffer.setBlocking(bool.get()); + outputBuffer.setBlocking(bool.get()); } else if (actionCode == ActionCode.ACTION_COMET_READABLE) { MutableBoolean bool = (MutableBoolean)param; try { Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Dec 3 16:06:24 2007 @@ -39,6 +39,7 @@ import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioEndpoint.Handler; +import org.apache.tomcat.util.net.PollerInterest; import org.apache.tomcat.util.net.SSLImplementation; import org.apache.tomcat.util.net.SecureNioChannel; import org.apache.tomcat.util.net.SocketStatus; @@ -219,7 +220,7 @@ private int socketCloseDelay=-1; private boolean disableUploadTimeout = true; private int socketBuffer = 9000; - + private int bufferedWriteSize = 64*1024; private Adapter adapter; private Http11ConnectionHandler cHandler; @@ -533,6 +534,10 @@ this.processorCache = processorCache; } + public void setBufferedWriteSize(int bufferedWriteSize) { + this.bufferedWriteSize = bufferedWriteSize; + } + public void setOomParachute(int oomParachute) { ep.setOomParachute(oomParachute); setAttribute("oomParachute",oomParachute); @@ -682,7 +687,9 @@ if (log.isDebugEnabled()) log.debug("Keeping processor["+result); //add correct poller events here based on Comet stuff NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - socket.getPoller().add(socket,att.getCometOps()); + int ops = att.getCometOps(); + if (result.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE; + socket.getPoller().add(socket,ops); } } } @@ -725,7 +732,9 @@ connections.put(socket, processor); if (processor.comet) { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - socket.getPoller().add(socket,att.getCometOps()); + int ops = att.getCometOps(); + if (processor.outputBuffer.hasDataToWrite()) ops = ops|PollerInterest.WRITE; + socket.getPoller().add(socket,ops); } else { socket.getPoller().add(socket); } @@ -775,6 +784,7 @@ processor.setCompressableMimeTypes(proto.compressableMimeTypes); processor.setRestrictedUserAgents(proto.restrictedUserAgents); processor.setSocketBuffer(proto.socketBuffer); + processor.setBufferedWriteSize(proto.bufferedWriteSize); processor.setMaxSavePostSize(proto.maxSavePostSize); processor.setServer(proto.server); register(processor); @@ -842,6 +852,10 @@ public int getProcessorCache() { return processorCache; + } + + public int getBufferedWriteSize() { + return bufferedWriteSize; } public int getOomParachute() { Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioInputBuffer.java Mon Dec 3 16:06:24 2007 @@ -28,9 +28,9 @@ import org.apache.tomcat.util.buf.MessageBytes; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.NioChannel; +import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.net.NioEndpoint; /** * Implementation of InputBuffer which provides HTTP request header parsing as @@ -188,7 +188,10 @@ */ protected int lastActiveFilter; - + /** + * Flag used only for Comet requests/responses + */ + protected boolean blocking = true; // ------------------------------------------------------------- Properties @@ -206,6 +209,10 @@ return socket; } + public boolean isBlocking() { + return blocking; + } + public void setSelectorPool(NioSelectorPool pool) { this.pool = pool; } @@ -283,6 +290,10 @@ this.swallowInput = swallowInput; } + public void setBlocking(boolean blocking) { + this.blocking = blocking; + } + // --------------------------------------------------------- Public Methods /** * Returns true if there are bytes available from the socket layer @@ -328,7 +339,7 @@ parsingRequestLineQPos = -1; headerData.recycle(); swallowInput = true; - + blocking = true; } @@ -373,7 +384,7 @@ parsingRequestLineQPos = -1; headerData.recycle(); swallowInput = true; - + blocking = true; } @@ -889,7 +900,10 @@ throws IOException { if (pos >= lastValid) { - if (!fill(true,true)) //read body, must be blocking, as the thread is inside the app + //since the filters are not stateful + //we can't issue non blocking reads. + //It simply doesn't work. + if (!fill(true,true)) return -1; } Modified: tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Mon Dec 3 16:06:24 2007 @@ -25,6 +25,7 @@ import org.apache.coyote.ActionCode; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Response; +import org.apache.tomcat.util.MutableInteger; import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.buf.CharChunk; import org.apache.tomcat.util.buf.MessageBytes; @@ -34,8 +35,7 @@ import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; -import java.io.EOFException; -import org.apache.tomcat.util.MutableInteger; +import java.nio.BufferOverflowException; /** * Output buffer. @@ -77,8 +77,6 @@ } else { bbufLimit = (headerBufferSize / 1500 + 1) * 1500; } - //bbuf = ByteBuffer.allocateDirect(bbufLimit); - outputStreamOutputBuffer = new SocketOutputBuffer(); filterLibrary = new OutputFilter[0]; @@ -183,6 +181,31 @@ */ protected int lastActiveFilter; + /** + * Flag used only for Comet requests/responses + */ + protected boolean blocking = true; + + /** + * Track if the byte buffer is flipped + */ + protected boolean flipped = false; + + /** + * For "non-blocking" writes use an external buffer + */ + protected ByteBuffer bufferedWrite = null; + + /** + * The max size of the buffered write buffer + */ + protected int bufferedWriteSize = 64*1024; //64k default write buffer + + /** + * track if buffered buffer is flipped + */ + protected boolean bufflipped = false; + // ------------------------------------------------------------- Properties @@ -193,6 +216,19 @@ this.socket = socket; } + public void setBlocking(boolean blocking) { + this.blocking = blocking; + bufflipped = false; + if (blocking) + bufferedWrite = null; + else + bufferedWrite = ByteBuffer.allocate(bufferedWriteSize); + } + + public void setBufferedWriteSize(int bufferedWriteSize) { + this.bufferedWriteSize = bufferedWriteSize; + } + /** * Get the underlying socket input stream. */ @@ -200,6 +236,36 @@ return socket; } + public boolean isBlocking() { + return blocking; + } + + public ByteBuffer getBufferedWrite() { + return bufferedWrite; + } + + public boolean hasBufferedData() { + if (getBufferedWrite()!=null) { + if (bufflipped) return getBufferedWrite().hasRemaining(); + else return getBufferedWrite().position()>0; + }else { + return false; + } + } + + public boolean hasDataToWrite() { + if (!hasBufferedData()) { + if (flipped) return socket.getBufHandler().getWriteBuffer().hasRemaining(); + else return socket.getBufHandler().getWriteBuffer().position()>0; + }else { + return true; + } + } + + public int getBufferedWriteSize() { + return bufferedWriteSize; + } + public void setSelectorPool(NioSelectorPool pool) { this.pool = pool; } @@ -286,19 +352,14 @@ */ public void flush() throws IOException { - if (!committed) { // Send the connector a request for commit. The connector should // then validate the headers, send them (using sendHeader) and // set the filters accordingly. response.action(ActionCode.ACTION_COMMIT, null); - } - - // Flush the current buffer - flushBuffer(); - + flushBuffer(isBlocking()); } @@ -322,6 +383,10 @@ * connection. */ public void recycle() { + recycle(true); + } + public void recycle(boolean clearbuf) { + // Recycle filters for (int i = 0; i <= lastActiveFilter; i++) { activeFilters[i].recycle(); @@ -329,7 +394,7 @@ // Recycle Request object response.recycle(); - socket.getBufHandler().getWriteBuffer().clear(); + if (clearbuf && socket!=null) socket.getBufHandler().getWriteBuffer().clear(); socket = null; pos = 0; @@ -337,7 +402,8 @@ committed = false; finished = false; lastWrite.set(1); - + setBlocking(true); + flipped = false; } @@ -348,21 +414,7 @@ * to parse the next HTTP request. */ public void nextRequest() { - - // Recycle Request object - response.recycle(); - - // Recycle filters - for (int i = 0; i <= lastActiveFilter; i++) { - activeFilters[i].recycle(); - } - - // Reset pointers - pos = 0; - lastActiveFilter = -1; - committed = false; - finished = false; - + recycle(false);//proper pipeline support? } @@ -389,14 +441,18 @@ if (lastActiveFilter != -1) activeFilters[lastActiveFilter].end(); - flushBuffer(); + flushBuffer(true); //dont return upon call of close() finished = true; } public boolean isWritable() { - return lastWrite.get()>0; + if (lastWrite.get()>0) { + return !hasDataToWrite(); + }else { + return false; + } } // ------------------------------------------------ HTTP/1.1 Output Methods @@ -408,9 +464,8 @@ throws IOException { if (!committed) { - //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0 socket.getBufHandler() .getWriteBuffer().put(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length); - writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true); + writeToSocket(socket.getBufHandler() .getWriteBuffer(),true,true);//ack is always blocking } } @@ -424,8 +479,10 @@ * @todo Fix non blocking write properly */ private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { - if ( flip ) bytebuffer.flip(); - + if ( flip ) { + bytebuffer.flip(); + flipped = true; + } int written = 0; NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); if ( att == null ) throw new IOException("Key must be cancelled"); @@ -440,12 +497,18 @@ written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite); //make sure we are flushed do { - if (socket.flush(true,selector,writeTimeout,lastWrite)) break; + //force writing of the net buffer on SSL + if (socket.flush(true,selector,writeTimeout,lastWrite)) break; }while ( true ); }finally { if ( selector != null ) getSelectorPool().put(selector); } - if ( block ) bytebuffer.clear(); //only clear + if ( block || bytebuffer.remaining()==0) { + //blocking writes must empty the buffer + //and if remaining==0 then we did empty it + bytebuffer.clear(); + flipped = false; + } this.total = 0; return written; } @@ -612,13 +675,20 @@ int total = 0; private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException { - while (socket.getBufHandler().getWriteBuffer().remaining() < length) { - flushBuffer(); + if (isBlocking()) { + while (socket.getBufHandler().getWriteBuffer().remaining() < length) { + flushBuffer(true); + } + socket.getBufHandler().getWriteBuffer().put(buf, offset, length); + total += length; + NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment) socket.getAttachment(false); + if (ka != null) + ka.access(); //prevent timeouts for just doing client writes + } else { + if (bufferedWrite.remaining()<length) throw new IOException("BufferOverflowException:Unable to fit buffered write data in buffer."); + if (bufflipped) throw new IOException("Invalid write attempt, previous buffered write not completed."); + bufferedWrite.put(buf, offset, length); } - socket.getBufHandler().getWriteBuffer().put(buf, offset, length); - total += length; - NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - if ( ka!= null ) ka.access();//prevent timeouts for just doing client writes } @@ -753,25 +823,58 @@ /** * Callback to write data from the buffer. + * @return the number of bytes written */ - protected void flushBuffer() + protected int flushBuffer(boolean block) throws IOException { - + int result = 0; //prevent timeout for async, SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if (key != null) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); attach.access(); } - + //write to the socket, if there is anything to write - if (socket.getBufHandler().getWriteBuffer().position() > 0) { - socket.getBufHandler().getWriteBuffer().flip(); - writeToSocket(socket.getBufHandler().getWriteBuffer(),true, false); + if ((flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) || + (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0) ) { + result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped); + } else if (bufferedWrite!=null) { + if ((bufflipped && bufferedWrite.remaining()>0)||(!bufflipped && bufferedWrite.position()>0)) { + //transfer to the socket buffer + if (!bufflipped) { + bufferedWrite.flip(); + bufflipped = true; + } + transfer(bufferedWrite, socket.getBufHandler().getWriteBuffer()); + if (bufferedWrite.remaining() == 0) { + bufferedWrite.clear(); + bufflipped = false; + } + result = writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true); + } + } + return result; + } + + protected int transfer(ByteBuffer from, ByteBuffer to) { + int remaining = from.remaining(); + int toRemaining = to.remaining(); + if (toRemaining >= remaining) { + to.put(from); + return remaining; + } else { + int limit = from.limit(); + int position = from.position(); + from.limit(position + toRemaining); + to.put(from); + from.limit(limit); + return toRemaining; } } + // ----------------------------------- OutputStreamOutputBuffer Inner Class @@ -792,19 +895,24 @@ int len = chunk.getLength(); int start = chunk.getStart(); byte[] b = chunk.getBuffer(); - while (len > 0) { - int thisTime = len; - if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) { - flushBuffer(); - } - if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) { - thisTime = socket.getBufHandler().getWriteBuffer().remaining(); + if (isBlocking()) { + while (len > 0) { + int thisTime = len; + if (socket.getBufHandler().getWriteBuffer().position() == socket.getBufHandler().getWriteBuffer().capacity()) { + flushBuffer(true); + } + if (thisTime > socket.getBufHandler().getWriteBuffer().remaining()) { + thisTime = socket.getBufHandler().getWriteBuffer().remaining(); + } + addToBB(b,start,thisTime); + len = len - thisTime; + start = start + thisTime; } - addToBB(b,start,thisTime); - len = len - thisTime; - start = start + thisTime; + return chunk.getLength(); + }else { + addToBB(b,start,len); + return len; } - return chunk.getLength(); } Modified: tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/sandbox/gdev6x/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Dec 3 16:06:24 2007 @@ -1419,8 +1419,12 @@ //the comet event takes care of clean up //processSocket(ka.getChannel(), status, dispatch); ka.setComet(false);//to avoid a loop - processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key - if (status == SocketStatus.TIMEOUT ) return; // don't close on comet timeout + if (status == SocketStatus.TIMEOUT ) { + processSocket(ka.getChannel(), status, true); + return; // don't close on comet timeout + } else { + processSocket(ka.getChannel(), status, false); //don't dispatch if the lines below are cancelling the key + } } handler.release(ka.getChannel()); if (key.isValid()) key.cancel(); @@ -1545,12 +1549,12 @@ //set interest ops to 0 so we don't get multiple //invokations for both read and write on separate threads reg(sk, attachment, 0); - //read goes before write - if (sk.isReadable()) { - if (!processSocket(channel, SocketStatus.OPEN_READ)) + //write goes before write + if (sk.isWritable()) { + if (!processSocket(channel, SocketStatus.OPEN_WRITE)) processSocket(channel, SocketStatus.DISCONNECT); } else { - if (!processSocket(channel, SocketStatus.OPEN_WRITE)) + if (!processSocket(channel, SocketStatus.OPEN_READ)) processSocket(channel, SocketStatus.DISCONNECT); } } else { Modified: tomcat/sandbox/gdev6x/webapps/docs/aio.xml URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/aio.xml?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/webapps/docs/aio.xml (original) +++ tomcat/sandbox/gdev6x/webapps/docs/aio.xml Mon Dec 3 16:06:24 2007 @@ -865,6 +865,16 @@ </ul> </section> + <section name="Non blocking buffered writes"> + <p> + Another feature of Comet is that you can enable buffered non blocking writes. + This is not non blocking as in the NIO sense, instead tomcat will buffer your response + and when you invoke response.flushBuffer() it will try to flush the buffer to the socket in + a non blocking fashion. If the buffer wrote out completely, CometEvent.isWriteable() will return true, + if there is more in the buffer to write, CometEvent.isWriteable() will return false, and you should not attempt + further writes, instead register for CometEvent.CometOperation.OP_WRITE to be notified when you can write again. + </p> + </section> </body> </document> Modified: tomcat/sandbox/gdev6x/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/changelog.xml?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/webapps/docs/changelog.xml (original) +++ tomcat/sandbox/gdev6x/webapps/docs/changelog.xml Mon Dec 3 16:06:24 2007 @@ -17,6 +17,11 @@ <body> <section name="Tomcat g6.xdev(unknown)"> <subsection name="Catalina"> + <update> + Implement buffered write + This means that a Comet servlet can write to the buffer without blocking. + Writing actually happens upon flushBuffer in a non blocking fashion + </update> <fix><bug>43653</bug> Fix for SSL buffer mixup </fix> Modified: tomcat/sandbox/gdev6x/webapps/docs/config/http.xml URL: http://svn.apache.org/viewvc/tomcat/sandbox/gdev6x/webapps/docs/config/http.xml?rev=600737&r1=600736&r2=600737&view=diff ============================================================================== --- tomcat/sandbox/gdev6x/webapps/docs/config/http.xml (original) +++ tomcat/sandbox/gdev6x/webapps/docs/config/http.xml Mon Dec 3 16:06:24 2007 @@ -556,6 +556,11 @@ If you have an OOM outside of the Java Heap, then this parachute trick will not help. </p> </attribute> + <attribute name="bufferedWriteSize" required="false"> + <p>(int) The size in bytes that should be used when Comet servlets used buffered/non blocking write logic. + The default is <code>64kb</code> or <code>64*1024 bytes</code>. + </p> + </attribute> </attributes> </subsection> --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]