Author: costin
Date: Sat Jun 14 08:30:48 2008
New Revision: 667815
URL: http://svn.apache.org/viewvc?rev=667815&view=rev
Log:
Server side. As with the client side, most is actually cut&pasted from the apr
and nio connectors, and heavily refactored.
Added:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
(with props)
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
(with props)
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
(with props)
Added:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java?rev=667815&view=auto
==============================================================================
---
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
(added)
+++
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
Sat Jun 14 08:30:48 2008
@@ -0,0 +1,840 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.ActionHook;
+import org.apache.coyote.adapters.CoyoteServer;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.http11.Constants;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.HexUtils;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.FastHttpDateFormat;
+import org.apache.tomcat.util.http.MimeHeaders;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/*
+ * Cut&pasted from nio and apr connectors, replacing the
+ * IO with the AsyncHttp model.
+ */
+
+/**
+ * Handles a single http transaction, for Coyote connector.
+ * Created ( or get from pool ) when the request is received,
+ * released when service is done and body is sent/received.
+ *
+ * TODO: more on the async part - move mapping and more to the IO thread,
+ * right now it still require a dispatch to the thread pool
+ *
+ * @author Costin Manolache
+ */
+public class AsyncHttpProcessor extends AsyncHttp
+ implements ActionHook, Runnable {
+
+ public static final byte[] ACK_BYTES =
+ ByteChunk.convertToBytes("HTTP/1.1 100 Continue\r\n\r\n");
+
+ static ByteChunk CLOSE = ByteChunk.fromString("close");
+ static ByteChunk KEEPALIVE = ByteChunk.fromString("keep-alive");
+ public static final String HTTP_11 = "HTTP/1.1";
+ public static final String HTTP_10 = "HTTP/1.0";
+ public static final String GET = "GET";
+ public static final String POST = "POST";
+
+
+ // Empty string to not send identification
+ String serverHeader = "ApacheTomcat";
+ AsyncProtocolHandler simpleProtocolHandler;
+
+ boolean http11 = false;
+ boolean http09 = false;
+ boolean error = false;
+ // don't return to keep alive.
+ boolean comet = false;
+ boolean cometClose = false;
+
+ // TODO: use CharChunk
+ protected char[] hostNameC = new char[0];
+ boolean ssl;
+ BlockingCoyoteBuffers blockingIO = new BlockingCoyoteBuffers(this);
+
+ /**
+ * Implements blocking write and buffering of the response.
+ * Uses the 'output' ByteBuffer.
+ *
+ * Since the socket is non-blocking, there are few special things:
+ * - flush() and endRequest() will cause the callback to register for WRITE
+ * - write will keep sending data until the buffer is empty.
+ * - flush() will wait for the buffer to be empty, endRequest will not.
+ * - when the buffer is full, a flush() will be called
+ * - TODO: it should be possible to add to the buffer while data is
+ * written - tricky for sync
+ *
+ * A comet request could bypass the OutputBuffer and work directly
+ * with the selector and the real buffers, this is just a wrapper.
+ */
+
+ public AsyncHttpProcessor(AsyncProtocolHandler simpleProtocolHandler) {
+ super(null, true);
+ this.simpleProtocolHandler = simpleProtocolHandler;
+ res.setHook(this);
+ res.setOutputBuffer(blockingIO.outputBuffer);
+ req.setInputBuffer(blockingIO.inputBuffer);
+ debug = true;
+ }
+
+
+ public void recycle() {
+ super.recycle();
+ http11 = false;
+ http09 = false;
+ error = false;
+ comet = false;
+ cometClose = false;
+ allDone = false;
+ }
+
+ // Call the servlet...
+ @Override
+ protected void headersReceived() throws IOException {
+ processReadTransferHeaders(req.getMimeHeaders());
+
+ if (!isReadContentDelimited()) {
+ if (req.method().equals("GET") ||
+ req.method().equalsIgnoreCase("HEAD")) {
+ // No body.
+ handleEndReceive(false);
+ } else {
+ closeOnEndSend = true;
+ closeOnEndRecv = true; // needBody
+ }
+ } // else: delimited, need body
+
+ // TODO: map, detect 'async' handlers
+ if (httpCallback != null) {
+ httpCallback.headersReceived(this, req, res);
+ }
+
+ simpleProtocolHandler.tp.execute(this);
+ }
+
+ @Override
+ protected int parseHead() throws IOException {
+ boolean ok = parser.parseRequestLine(req.method(),
+ req.unparsedURI(), req.query(),
+ req.requestURI(), req.protocol());
+ if (!ok) {
+ return NEED_MORE;
+ }
+
+ int dataStart = parser.parseHeaders(req.getMimeHeaders());
+ return (dataStart < 0) ? NEED_MORE : dataStart;
+ }
+
+ /**
+ * This part is run in the thread pool, after headers received.
+ */
+ public void run() {
+ try {
+ // Make ourself available to comet requests.
+ req.setNote(CoyoteServer.COMET_REQ_NOTE, this);
+ prepareRequest();
+
+ String uri = req.unparsedURI().toString(); // will be recycled
+ if (debug ) {
+ log.info("Server: SERVICE START ---- " + uri);
+ }
+
+ try {
+ this.simpleProtocolHandler.adapter.service(req, res);
+ // at this point the response is in process of sending or sent
+ // the client may already start pipeline-ing the next req.
+ } catch (IOException ex) {
+ if ("Broken pipe".equals(ex.getMessage())) {
+ log.warning("Connection interrupted while writting");
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+
+ if (res.getNote(CoyoteServer.COMET_RES_NOTE) == null) {
+ // request done - let's return this processor
+ // TODO: special pool !! release(); // by the time
updateCallback ends, we may start
+ // receiving the next request already
+ log.info("Server: SERVICE DONE: ---- " + uri);
+ release();
+ } else {
+ log.info("Server: coyote async req: ---- " + uri);
+ // Comet - don't release
+ }
+ } catch (IOException ex) {
+ if ("Broken pipe".equals(ex.getMessage())) {
+ log.warning("Connection interrupted while writting");
+ } else {
+ ex.printStackTrace();
+ }
+ } catch (Throwable t) {
+ // TODO Auto-generated catch block
+ t.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void handleEndSendReceive(SelectorData selT)
+ throws IOException {
+ // Called when both send and receive are done. Either from IO or
+ // regular thread.
+ if (httpCallback != null) {
+ httpCallback.done(this, false, null);
+ }
+ if (closeOnEndSend) {
+ selT.sel.close(selT);
+ log.info("server send/receive done, close=" + closeOnEndSend);
+ maybeRelease();
+ } else {
+ // If write is not done in the IO thread, we may have the last
+ // packet and endSendReceive called in the thread pool.
+ // If this happens, read can happen while we are doing this method.
+ maybeRelease();
+ }
+ }
+
+
+ /**
+ * Finalize the sending. Indicates servlet processing is done, some
+ * IO may still be in flight.
+ *
+ * MUST be called - either after service or from the commet app.
+ *
+ * The tricky part is if this gets called before or after or at same
+ * time with endSendReceive and with next request getting pipelined.
+ *
+ * @throws IOException
+ */
+ public void release() throws IOException {
+ endSend(this); // make sure the final marker is sent.
+ serviceDone = true;
+ maybeRelease();
+ }
+
+ @Override
+ protected void handleReceivePastEnd(SelectorData selT) throws IOException {
+ }
+
+ @Override
+ public void ioThreadRun(SelectorData selT) throws IOException {
+ // Currently used only to set keep alive.
+ // If we need more, need to use state or pass a param.
+
+ if (rawRecvBuf.length() > 0) {
+ // what if I'm not done ?
+ if (debug) {
+ log.info("Done, pipelined next request \n" + rawRecvBuf + " "
+ + rawRecvBuf.length());
+ }
+ headRecvBuf.recycle();
+ headRecvBuf.append(rawRecvBuf);
+ recycle(true);
+ // No release - next req in process
+
+ // In case it was disabled
+ selector.readInterest(selT, true);
+ // Notify that data was received. The callback must deal with
+ // pre-existing data.
+ selectorCallback.dataReceived(selT);
+ return;
+ }
+
+ synchronized (this) {
+ if (keepAliveCallback == null) {
+ keepAliveCallback =
simpleProtocolHandler.getKeepAliveCallback();
+ }
+ selector.readInterest(selT, true);
+
+ selector.updateCallback(selT, this.selectorCallback,
keepAliveCallback);
+
+ if (debug) {
+ log.info("SERVER send/receive done, KEEP_ALIVE");
+ }
+ }
+
+ recycle();
+ returnToPool();
+ }
+
+ public void fromKeepAlive(SelectorData selThread, SelectorCallback ka)
+ throws IOException {
+ synchronized (this) {
+ keepAliveCallback = ka;
+
+ selThread.sel.updateCallback(selThread, ka, selectorCallback);
+ }
+ try {
+ selectorCallback.dataReceived(selThread);
+ } catch (IOException ex) {
+ selThread.sel.close(selThread);
+ }
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+
sb.append("ServerHttpProcessor-").append(ser).append(",rs=").append(recvState)
+ .append(",ss=");
+ sb.append(sendState).append(")");
+ return sb.toString();
+ }
+
+ private String[] actionNames = {
+ "", "ACK", "CLOSE", "COMMIT", "CLIENT_FLUSH", "CUSTOM",
+ "RESET", "START", "STOP" /*8*/,
+ "WEBAPP", "POST_REQ", "REQ_HOST_ATTRIBUTE",
+ "REQ_HOST_ADDR_ATTRIBUTE", "REQ_SSL_ATTRIBUTE",
+ "NEW_REQ", "REQ_SSL_CERT", "REQ_REMOTEPORT",
+ "REQ_LOCALPORT",
+ };
+
+ public void action(ActionCode actionCode, Object param) {
+ if (debug) {
+ log.info("ServerHttpProcessor: Action: " + actionCode.getCode()
+ + " " + ((actionCode.getCode() < actionNames.length) ?
+ actionNames[actionCode.getCode()] : "") );
+ }
+ if (actionCode == ActionCode.ACTION_COMMIT) {
+ maybeCommitResponse();
+ return;
+ } else if (actionCode == ActionCode.ACTION_ACK) {
+ send100Continue();
+ } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {
+ maybeCommitResponse();
+ try {
+ blockingIO.outputBuffer.flush();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ res.setErrorException(e);
+ }
+ } else if (actionCode == ActionCode.ACTION_CLOSE) {
+ // Close - from commet or post request == res.finish()
+ // End the processing of the current request, and stop any further
+ // transactions with the client
+
+ comet = false;
+ cometClose = true;
+
+ try {
+ blockingIO.outputBuffer.endRequest();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ }
+ } else if (actionCode == ActionCode.ACTION_RESET) {
+ // Reset response
+ // Note: This must be called before the response is committed
+ blockingIO.outputBuffer.reset();
+ } else if (actionCode == ActionCode.ACTION_CUSTOM) {
+ // Do nothing
+ } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) {
+// ByteChunk body = (ByteChunk) param;
+
+// InputFilter savedBody = new SavedRequestInputFilter(body);
+// savedBody.setRequest(request);
+
+// InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer)
+// request.getInputBuffer();
+// internalBuffer.addActiveFilter(savedBody);
+
+ } else if (actionCode == ActionCode.ACTION_AVAILABLE) {
+ req.setAvailable(blockingHttp.available());
+ } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
+ res.setNote(CoyoteServer.COMET_RES_NOTE, res);
+ comet = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_END) {
+ comet = false;
+ } // --------------------- Address/port --------------------------
+ else if (actionCode ==
ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {
+
+ InetAddress addr =
+ selectorData.sel.getAddress(selectorData, false);
+
+ String host = addr.getHostName();
+ req.localName().setString((host == null) ?
+ addr.getHostAddress() : host);
+
+ } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {
+ InetAddress addr =
+ selectorData.sel.getAddress(selectorData, true);
+ String host = addr.getHostName();
+ req.remoteHost().setString((host == null) ?
+ addr.getHostAddress() : host);
+ // old code had checks for addr == null
+
+ } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {
+ InetAddress localAddr =
+ selectorData.sel.getAddress(selectorData, true);
+ req.remoteAddr().setString(localAddr.getHostAddress());
+
+ } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {
+ // TODO: cache it ?
+ InetAddress localAddr =
+ selectorData.sel.getAddress(selectorData, false);
+ req.localAddr().setString(localAddr.getHostAddress());
+
+ } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {
+ int remotePort =
+ selectorData.sel.getPort(selectorData, true);
+ req.setRemotePort(remotePort);
+
+ } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {
+ int port =
+ selectorData.sel.getPort(selectorData, false);
+ req.setLocalPort(port);
+
+ // --------------------- SSL --------------------------
+ } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {
+
+// try {
+// if (sslSupport != null) {
+// Object sslO = sslSupport.getCipherSuite();
+// if (sslO != null)
+// request.setAttribute
+// (SSLSupport.CIPHER_SUITE_KEY, sslO);
+// sslO = sslSupport.getPeerCertificateChain(false);
+// if (sslO != null)
+// request.setAttribute
+// (SSLSupport.CERTIFICATE_KEY, sslO);
+// sslO = sslSupport.getKeySize();
+// if (sslO != null)
+// request.setAttribute
+// (SSLSupport.KEY_SIZE_KEY, sslO);
+// sslO = sslSupport.getSessionId();
+// if (sslO != null)
+// request.setAttribute
+// (SSLSupport.SESSION_ID_KEY, sslO);
+// }
+// } catch (Exception e) {
+// log.warn(sm.getString("http11processor.socket.ssl"), e);
+// }
+
+ } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) {
+
+// if( sslSupport != null) {
+// /*
+// * Consume and buffer the request body, so that it does not
+// * interfere with the client's handshake messages
+// */
+// InputFilter[] inputFilters = inputBuffer.getFilters();
+// ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
+// .setLimit(maxSavePostSize);
+// inputBuffer.addActiveFilter
+// (inputFilters[Constants.BUFFERED_FILTER]);
+// try {
+// Object sslO = sslSupport.getPeerCertificateChain(true);
+// if( sslO != null) {
+// request.setAttribute
+// (SSLSupport.CERTIFICATE_KEY, sslO);
+// }
+// } catch (Exception e) {
+// log.warn(sm.getString("http11processor.socket.ssl"), e);
+// }
+// }
+
+ }
+
+ }
+
+ /**
+ * Acknowlege request ( Expect: 100 Continue )
+ */
+ public void send100Continue() {
+ if ((res.isCommitted()) || !expectation)
+ return;
+ expectation = false;
+ ByteChunk bc = new ByteChunk();
+ bc.setBytes(ACK_BYTES, 0, ACK_BYTES.length);
+ try {
+ startWrite(bc);
+ } catch (IOException e) {
+ error = true;
+ }
+ }
+
+ public void maybeCommitResponse() {
+ // Commit current response
+ if (res.isCommitted())
+ return;
+
+ // Validate and write response headers
+ try {
+ sendResponseHeaders();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ }
+ }
+
+ /**
+ * After reading the request headers, we have to setup the request filters.
+ */
+ protected void prepareRequest() {
+
+ http11 = true;
+ http09 = false;
+ contentDelimitationRecv = false;
+ expectation = false;
+// sendfileData = null;
+// if (ssl) {
+// req.scheme().setString("https");
+// }
+ MessageBytes protocolMB = req.protocol();
+ if (protocolMB.equals(HTTP_11)) {
+ http11 = true;
+ protocolMB.setString(HTTP_11);
+ } else if (protocolMB.equals(HTTP_10)) {
+ http11 = false;
+ closeOnEndSend = true;
+ protocolMB.setString(HTTP_10);
+ } else if (protocolMB.equals("")) {
+ // HTTP/0.9
+ http09 = true;
+ http11 = false;
+ closeOnEndSend = true;
+ } else {
+ // Unsupported protocol
+ http11 = false;
+ error = true;
+ // Send 505; Unsupported HTTP version
+ res.setStatus(505);
+ }
+
+ MessageBytes methodMB = req.method();
+ if (methodMB.equals(GET)) {
+ methodMB.setString(GET);
+ } else if (methodMB.equals(POST)) {
+ methodMB.setString(POST);
+ }
+
+ MimeHeaders headers = req.getMimeHeaders();
+
+ // Check connection header
+ MessageBytes connectionValueMB = headers.getValue("connection");
+ if (connectionValueMB != null) {
+ ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
+ if (connectionValueBC.indexOfIngoreCase(CLOSE, 0) != -1) {
+ closeOnEndSend = true;
+ } else if (connectionValueBC.indexOfIngoreCase(KEEPALIVE, 0) !=
-1) {
+ closeOnEndSend = false;
+ }
+ }
+
+ MessageBytes expectMB = null;
+ if (http11) {
+ expectMB = headers.getValue("expect");
+ }
+ if ((expectMB != null)
+ && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
+ expectation = true;
+ }
+
+ // Check user-agent header
+// if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {
+// MessageBytes userAgentValueMB = headers.getValue("user-agent");
+// // Check in the restricted list, and adjust the http11
+// // and keepAlive flags accordingly
+// if(userAgentValueMB != null) {
+// String userAgentValue = userAgentValueMB.toString();
+// for (int i = 0; i < restrictedUserAgents.length; i++) {
+// if (restrictedUserAgents[i].matcher(userAgentValue).matches()) {
+// http11 = false;
+// keepAlive = false;
+// break;
+// }
+// }
+// }
+// }
+
+ // Check for a full URI (including protocol://host:port/)
+ ByteChunk uriBC = req.requestURI().getByteChunk();
+ if (uriBC.startsWithIgnoreCase("http", 0)) {
+
+ int pos = uriBC.indexOf("://", 0, 3, 4);
+ int uriBCStart = uriBC.getStart();
+ int slashPos = -1;
+ if (pos != -1) {
+ byte[] uriB = uriBC.getBytes();
+ slashPos = uriBC.indexOf('/', pos + 3);
+ if (slashPos == -1) {
+ slashPos = uriBC.getLength();
+ // Set URI as "/"
+ req.requestURI().setBytes
+ (uriB, uriBCStart + pos + 1, 1);
+ } else {
+ req.requestURI().setBytes
+ (uriB, uriBCStart + slashPos,
+ uriBC.getLength() - slashPos);
+ }
+ MessageBytes hostMB = headers.setValue("host");
+ hostMB.setBytes(uriB, uriBCStart + pos + 3,
+ slashPos - pos - 3);
+ }
+
+ }
+
+ // Parse transfer-encoding header
+ processReadTransferHeaders(headers);
+
+ MessageBytes valueMB = headers.getValue("host");
+ // Check host header
+ if (http11 && (valueMB == null)) {
+ error = true;
+ // 400 - Bad request
+ res.setStatus(400);
+ }
+
+ parseHost(valueMB);
+
+ // Advertise sendfile support through a request attribute
+// if (endpoint.getUseSendfile())
+// request.setAttribute("org.apache.tomcat.sendfile.support",
Boolean.TRUE);
+// // Advertise comet support through a request attribute
+// request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
+// // Advertise comet timeout support
+// request.setAttribute("org.apache.tomcat.comet.timeout.support",
Boolean.TRUE);
+
+ }
+
+
+ /**
+ * Parse host.
+ */
+ public void parseHost(MessageBytes valueMB) {
+
+ if (valueMB == null || valueMB.isNull()) {
+ // HTTP/1.0
+ // Default is what the socket tells us. Overriden if a host is
+ // found/parsed
+// req.setServerPort(endpoint.getPort());
+ return;
+ }
+
+ ByteChunk valueBC = valueMB.getByteChunk();
+ byte[] valueB = valueBC.getBytes();
+ int valueL = valueBC.getLength();
+ int valueS = valueBC.getStart();
+ int colonPos = -1;
+ if (hostNameC.length < valueL) {
+ hostNameC = new char[valueL];
+ }
+
+ boolean ipv6 = (valueB[valueS] == '[');
+ boolean bracketClosed = false;
+ for (int i = 0; i < valueL; i++) {
+ char b = (char) valueB[i + valueS];
+ hostNameC[i] = b;
+ if (b == ']') {
+ bracketClosed = true;
+ } else if (b == ':') {
+ if (!ipv6 || bracketClosed) {
+ colonPos = i;
+ break;
+ }
+ }
+ }
+
+ if (colonPos < 0) {
+ if (!ssl) {
+ // 80 - Default HTTP port
+ req.setServerPort(80);
+ } else {
+ // 443 - Default HTTPS port
+ req.setServerPort(443);
+ }
+ req.serverName().setChars(hostNameC, 0, valueL);
+ } else {
+
+ req.serverName().setChars(hostNameC, 0, colonPos);
+
+ int port = 0;
+ int mult = 1;
+ for (int i = valueL - 1; i > colonPos; i--) {
+ int charValue = HexUtils.DEC[(int) valueB[i + valueS]];
+ if (charValue == -1) {
+ // Invalid character
+ error = true;
+ // 400 - Bad request
+ res.setStatus(400);
+ break;
+ }
+ port = port + (charValue * mult);
+ mult = 10 * mult;
+ }
+ req.setServerPort(port);
+
+ }
+
+ }
+
+ /**
+ * Called after the headers are sent.
+ *
+ */
+ @Override
+ public void headersSent() throws IOException {
+ sendState = State.BODY_DATA;
+ int statusCode = res.getStatus();
+ if ((statusCode == 204) || (statusCode == 205)
+ || (statusCode == 304)) {
+ noBodySend = true;
+ }
+
+ MessageBytes methodMB = req.method();
+ if (methodMB.equals("HEAD")) {
+ // No entity body
+ noBodySend = true;
+ }
+
+ }
+
+ /**
+ * When committing the response, we have to validate the set of headers, as
+ * well as setup the response filters.
+ */
+ public void sendResponseHeaders() throws IOException {
+
+ boolean entityBody = true;
+ contentDelimitationRecv = false;
+
+ //List<OutputFilter> outputFilters = outputBuffer.getFilters();
+
+ if (http09 == true) {
+ // HTTP/0.9 - no headers
+ closeOnEndSend = true;
+ return;
+ }
+
+ int statusCode = res.getStatus();
+ if ((statusCode == 204) || (statusCode == 205)
+ || (statusCode == 304)) {
+ entityBody = false;
+ contentDelimitationRecv = true;
+ }
+
+ MessageBytes methodMB = req.method();
+ if (methodMB.equals("HEAD")) {
+ // No entity body
+ contentDelimitationRecv = true;
+ }
+
+// // Sendfile support
+// if (this.endpoint.getUseSendfile()) {
+// String fileName = (String)
request.getAttribute("org.apache.tomcat.sendfile.filename");
+// if (fileName != null) {
+// // No entity body sent here
+// outputBuffer.addActiveFilter(outputFilters[Constants.VOID_FILTER]);
+// contentDelimitation = true;
+// sendfileData = new NioEndpoint.SendfileData();
+// sendfileData.fileName = fileName;
+// sendfileData.pos = ((Long)
request.getAttribute("org.apache.tomcat.sendfile.start")).longValue();
+// sendfileData.length = ((Long)
request.getAttribute("org.apache.tomcat.sendfile.end")).longValue() -
sendfileData.pos;
+// }
+// }
+
+ // Check for compression
+// boolean useCompression = false;
+// if (entityBody && (compressionLevel > 0) && (sendfileData == null)) {
+// useCompression = isCompressable();
+// // Change content-length to -1 to force chunking
+// if (useCompression) {
+// response.setContentLength(-1);
+// }
+// }
+
+ MimeHeaders headers = res.getMimeHeaders();
+ if (!entityBody) {
+ res.setContentLength(-1);
+ } else {
+ String contentType = res.getContentType();
+ if (contentType != null) {
+ headers.setValue("Content-Type").setString(contentType);
+ }
+ String contentLanguage = res.getContentLanguage();
+ if (contentLanguage != null) {
+ headers.setValue("Content-Language")
+ .setString(contentLanguage);
+ }
+ }
+
+ long contentLength = res.getContentLengthLong();
+ if (contentLength != -1) {
+ headers.setValue("Content-Length").setLong(contentLength);
+ remainingSend = contentLength;
+ contentDelimitationRecv = true;
+ } else {
+ if (entityBody && http11 && !closeOnEndSend) {
+ chunkedSend = true;
+ contentDelimitationRecv = true;
+
headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
+ } else {
+ closeOnEndSend = true;
+ }
+ }
+
+// if (useCompression) {
+// outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
+// headers.setValue("Content-Encoding").setString("gzip");
+// // Make Proxies happy via Vary (from mod_deflate)
+// headers.setValue("Vary").setString("Accept-Encoding");
+// }
+
+ // Add date header
+
headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());
+
+ // FIXME: Add transfer encoding header
+
+ if ((entityBody) && (!contentDelimitationRecv)) {
+ // Mark as close the connection after the request, and add the
+ // connection: close header
+ closeOnEndSend = true;
+ }
+
+ // If we know that the request is bad this early, add the
+ // Connection: close header.
+ closeOnEndSend = closeOnEndSend || statusDropsConnection(statusCode);
+ if (closeOnEndSend) {
+ headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
+ } else if (!http11 && !error) {
+
headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
+ }
+
+ // Add server header
+ if (serverHeader.length() > 0) {
+ headers.setValue("Server").setString(serverHeader);
+ }
+
+ serializeResponse(res, headSendBuf);
+
+ startWrite(headSendBuf);
+ }
+
+ SelectorCallback getCallback() {
+ return selectorCallback;
+ }
+
+}
\ No newline at end of file
Propchange:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
URL:
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java?rev=667815&view=auto
==============================================================================
---
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
(added)
+++
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
Sat Jun 14 08:30:48 2008
@@ -0,0 +1,180 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.coyote.Adapter;
+import org.apache.coyote.ProtocolHandler;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.client.AsyncHttpPool;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorPool;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/**
+ * New protocol handler using SelectorThread to abstract NIO and
+ * APR, with more flexibility on processing async requests ( coyote++)
+ *
+ * The second goal is to keep it as simple as possible, and reuse
+ * code with the async http client.
+ *
+ *
+ * @author Costin Manolache
+ */
+public class AsyncProtocolHandler implements ProtocolHandler {
+ public static final String A_DAEMON = "daemon";
+ public static final String A_PORT = "port";
+ Adapter adapter;
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ AsyncHttpPool serverPool = new AsyncHttpPool("Server", null) {
+ @Override
+ protected AsyncHttp create() {
+ return new AsyncHttpProcessor(AsyncProtocolHandler.this);
+ }
+ };
+ Executor tp = Executors.newCachedThreadPool();
+
+ public void destroy() throws Exception {
+ }
+
+ public Adapter getAdapter() {
+ return adapter;
+ }
+
+ public Object getAttribute(String name) {
+ return attributes.get(name);
+ }
+
+ public Iterator getAttributeNames() {
+ return attributes.keySet().iterator();
+ }
+
+ public void setAttribute(String name, Object value) {
+ attributes.put(name, value);
+ }
+
+ public void setAdapter(Adapter adapter) {
+ this.adapter = adapter;
+ }
+
+ public void init() throws Exception {
+ }
+
+ public void pause() throws Exception {
+ }
+
+ public void resume() throws Exception {
+ }
+
+ public void setPort(int port) {
+ setAttribute(A_PORT, Integer.toString(port));
+ }
+
+ public void setDaemon(boolean t) {
+ if (t) {
+ setAttribute("daemon", "1");
+ }
+ }
+
+ public void start() throws Exception {
+ SelectorPool pool = SelectorPool.defaultPool();
+ pool.setDaemon(getAttribute(A_DAEMON) != null);
+ String portS = (String) getAttribute(A_PORT);
+ int port = (portS == null) ? 8080 : Integer.parseInt(portS);
+ if (port == -1) {
+ // TODO: find an unused port
+ }
+ if (port == 0) {
+ pool.getSelector().inetdAcceptor(new AcceptorSelectorCallback());
+ } else {
+ pool.getSelector().acceptor(new AcceptorSelectorCallback(),
+ port, null, 100, 2000);
+ }
+ }
+
+ public SelectorCallback getKeepAliveCallback() {
+ return new KeepAliveCallback();
+ }
+
+ /**
+ * Callback for requests waiting in keep-alive.
+ * Minimal state - no buffers or request objects.
+ *
+ * TODO: we may use a small buffer to get the head before
+ * dispatching.
+ */
+ public class KeepAliveCallback extends SelectorCallback {
+ // TODO: keep headBuffer here, wait until the header is received.
+ // There is no reason to create the whole AsyncHttp before we have
+ // the request
+
+ /**
+ * Data available for read.
+ * If you consume all data ( readNB() returns 0 ), you can re-enable
this
+ * callback.
+ */
+ @Override
+ public void dataReceived(SelectorData selThread)
+ throws IOException {
+ AsyncHttp shttp = serverPool.get();
+ if (shttp == null) {
+ shttp = new AsyncHttpProcessor(AsyncProtocolHandler.this);
+ shttp.setClientPool(serverPool);
+ }
+ ((AsyncHttpProcessor) shttp).fromKeepAlive(selThread, this);
+ }
+
+
+ /**
+ * Close was detected.
+ */
+ @Override
+ public void channelClosed(SelectorData selThread, Throwable ex) {
+ if (ex != null ) {
+ System.err.println("Closed");
+ ex.printStackTrace();
+ }
+ }
+
+ }
+
+ /**
+ * When a connection is accepted:
+ * - try to parse a HTTP request
+ * - when we have the head, dispatch the processing - first
+ * to async handlers, then to a thread pool. This part is
+ * shared with the 'keep-alive' handler
+ *
+ * @author Costin Manolache
+ */
+ public class AcceptorSelectorCallback extends SelectorCallback {
+ @Override
+ public SelectorCallback connectionAccepted(SelectorData selThread,
+ Channel sockC) {
+ AsyncHttpProcessor shttp = (AsyncHttpProcessor) serverPool.get();
+ shttp.setSelectorData(selThread);
+ return shttp.getCallback();
+ }
+ }
+
+}
Propchange:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
URL:
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java?rev=667815&view=auto
==============================================================================
---
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
(added)
+++
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
Sat Jun 14 08:30:48 2008
@@ -0,0 +1,296 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.coyote.InputBuffer;
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.client.AsyncHttpCallback;
+import org.apache.coyote.client.BlockingHttp;
+import org.apache.coyote.http11.InputFilter;
+import org.apache.coyote.http11.OutputFilter;
+import org.apache.coyote.http11.filters.GzipInputFilter;
+import org.apache.coyote.http11.filters.GzipOutputFilter;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * Support for blocking coyote Input/Output Filter and Buffer.
+ *
+ * The client implements basic transfer encoding directly, non-blocking - this
+ * can be used to support compression filters ( content-encoding ) and custom
+ * filters. It is also needed to integrated with tomcat.
+ *
+ * The main problem with this class is the data copy - between buffers.
+ *
+ * @author Costin Manolache
+ */
+class BlockingCoyoteBuffers {
+
+ AsyncHttp aprocessor;
+ BlockingHttp blockingHttp;
+
+ // Tansfer-decoded, blocking input buffer
+ InputBuffer rawReadBuf;
+ // Tansfer-decoded, blocking output buffer
+ OutputBuffer rawWriteBuf;
+
+ // Blocking, content filtered buffers
+ FilteredInputBuffer inputBuffer;
+ FilteredOutputBuffer outputBuffer;
+
+ // this will have to be blocking for now
+ List<String> readEncodings = new ArrayList();
+ List<String> writeEncodings = new ArrayList();
+
+ public BlockingCoyoteBuffers(AsyncHttp aprocessor) {
+ this.aprocessor = aprocessor;
+ this.blockingHttp = aprocessor.getBlockingHttp();
+ rawReadBuf = new SelectorInputBuffer();
+ rawWriteBuf = new SelectorOutputBuffer();
+ inputBuffer = new FilteredInputBuffer(rawReadBuf);
+ outputBuffer = new FilteredOutputBuffer(rawWriteBuf);
+
+ }
+
+ public void recycle() {
+ inputBuffer.recycle();
+ readEncodings.clear();
+ }
+
+ public List<String> getEncodings() {
+ return readEncodings;
+ }
+
+
+ /**
+ * Initialize standard input and output filters.
+ */
+ protected void initializeFilters() {
+ // Create and add the chunked filters.
+ inputBuffer.addFilter(new GzipInputFilter());
+ outputBuffer.addFilter(new GzipOutputFilter());
+ }
+
+ // ----------- Implementation classes --------------
+
+ /**
+ * InputBuffer interface, will use filters to decode.
+ */
+ protected static class FilteredInputBuffer implements InputBuffer {
+
+ InputBuffer rawBuffer; // needed for filters
+
+ protected List<InputFilter> filterLibrary = new ArrayList();
+ protected List<InputFilter> activeFilters = new ArrayList();
+
+ public FilteredInputBuffer(InputBuffer rawBuffer) {
+ this.rawBuffer = rawBuffer;
+ }
+
+ public void recycle() {
+ activeFilters.clear();
+ }
+
+ public int doRead(ByteChunk chunk, Request request)
+ throws IOException {
+ if (activeFilters.size() == 0) {
+ return rawBuffer.doRead(chunk, request);
+ } else {
+ return activeFilters.get(activeFilters.size() -
1).doRead(chunk, request);
+ }
+ }
+
+ public void setSwallowInput(boolean b) {
+ }
+
+ public List<InputFilter> getFilters() {
+ return filterLibrary;
+ }
+
+ /**
+ * Add an input filter to the filter library.
+ */
+ public void addActiveFilter(InputFilter filter) {
+ if (activeFilters.size() == 0) {
+ filter.setBuffer(rawBuffer);
+ } else {
+ if (activeFilters.contains(filter)) {
+ return;
+ }
+ filter.setBuffer(activeFilters.get(activeFilters.size() - 1));
+ }
+ activeFilters.add(filter);
+ }
+
+ public void addFilter(InputFilter filter) {
+ if (filterLibrary.contains(filter)) {
+ return;
+ }
+ filterLibrary.add(filter);
+ }
+
+ public boolean addActiveFilter(String name) {
+ for (InputFilter f : filterLibrary) {
+ if (f.getEncodingName().toString().equals(name)) {
+ addActiveFilter(f);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ /**
+ * InputBuffer interface, will use filters to decode.
+ */
+ public class FilteredOutputBuffer implements OutputBuffer,
+ AsyncHttpCallback.AsyncHttpHolder {
+
+ OutputBuffer rawBuffer; // needed for filters
+
+ protected List<OutputFilter> filterLibrary = new ArrayList();
+ protected List<OutputFilter> activeFilters = new ArrayList();
+
+ public FilteredOutputBuffer(OutputBuffer rawBuffer) {
+ this.rawBuffer = rawBuffer;
+ }
+
+ public void recycle() {
+ activeFilters.clear();
+ }
+
+ /**
+ * Blocking write.
+ * @param chunk contains data, will not be modified
+ */
+ public int doWrite(ByteChunk chunk, Response res)
+ throws IOException {
+ if (!res.isCommitted()) {
+ res.sendHeaders(); // process headers, serialize head
+ }
+
+ if (activeFilters.size() == 0) {
+ return rawBuffer.doWrite(chunk, res);
+ } else {
+ return activeFilters.get(activeFilters.size() -
1).doWrite(chunk, res);
+ }
+ }
+
+ public List<OutputFilter> getFilters() {
+ return filterLibrary;
+ }
+
+// public void addFilter(OutputFilter f) {
+// filterLibrary.add(f);
+// }
+
+ /**
+ * Add an Output filter to the filter library.
+ */
+ public void addActiveFilter(OutputFilter filter) {
+ if (activeFilters.size() == 0) {
+ filter.setBuffer(rawBuffer);
+ } else {
+ if (activeFilters.contains(filter)) {
+ return;
+ }
+ filter.setBuffer(activeFilters.get(activeFilters.size() - 1));
+ }
+ activeFilters.add(filter);
+ }
+
+ public void addFilter(OutputFilter filter) {
+ if (filterLibrary.contains(filter)) {
+ return;
+ }
+ filterLibrary.add(filter);
+ }
+
+ public boolean addActiveFilter(String name) {
+ for (OutputFilter f : filterLibrary) {
+ if (f.getEncodingName().toString().equals(name)) {
+ addActiveFilter(f);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void flush() throws IOException {
+
+ }
+
+ public void sendAck() throws IOException {
+ }
+
+ public void endRequest() throws IOException {
+ if (activeFilters.size() > 0) {
+ activeFilters.get(activeFilters.size() - 1).end();
+ }
+ }
+
+ public void reset() {
+ }
+
+ public AsyncHttp getAsyncHttp() {
+ return aprocessor;
+ }
+
+ }
+
+ /**
+ * BLOCKING InputBuffer for the raw data received.
+ *
+ * Since tomcat buffer is blocking, this method will block
+ * as well until more data is received, if the input is empty.
+ *
+ */
+ class SelectorInputBuffer implements InputBuffer {
+
+ // Blocking read method
+ public int doRead(ByteChunk chunk, Request request)
+ throws IOException {
+ ByteChunk next = blockingHttp.readBlocking(0);
+ if (next == null) {
+ return -1;
+ }
+ chunk.setBytes(next.getBuffer(), next.getStart(), next.length());
+ return chunk.length();
+ }
+ }
+
+
+ class SelectorOutputBuffer implements OutputBuffer {
+
+ /**
+ * Blocking write.
+ * @param chunk contains data, will not be modified
+ */
+ public int doWrite(ByteChunk chunk, Response res)
+ throws IOException {
+ return blockingHttp.doWrite(chunk);
+ }
+ }
+
+
+}
\ No newline at end of file
Propchange:
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]