Author: costin
Date: Sat Jun 14 08:30:48 2008
New Revision: 667815

URL: http://svn.apache.org/viewvc?rev=667815&view=rev
Log:
Server side. As with the client side, most is actually cut&pasted from the apr 
and nio connectors, and heavily refactored.


Added:
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/
    tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/
    
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
   (with props)
    
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
   (with props)
    
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
   (with props)

Added: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java?rev=667815&view=auto
==============================================================================
--- 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
 (added)
+++ 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
 Sat Jun 14 08:30:48 2008
@@ -0,0 +1,840 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.ActionHook;
+import org.apache.coyote.adapters.CoyoteServer;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.http11.Constants;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.HexUtils;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.FastHttpDateFormat;
+import org.apache.tomcat.util.http.MimeHeaders;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/*
+ * Cut&pasted from nio and apr connectors, replacing the 
+ * IO with the AsyncHttp model. 
+ */
+
+/**
+ * Handles a single http transaction, for Coyote connector.
+ * Created ( or get from pool ) when the request is received, 
+ * released when service is done and body is sent/received.
+ * 
+ * TODO: more on the async part - move mapping and more to the IO thread, 
+ *  right now it still require a dispatch to the thread pool
+ * 
+ * @author Costin Manolache
+ */
+public class AsyncHttpProcessor extends AsyncHttp 
+        implements ActionHook, Runnable {
+  
+    public static final byte[] ACK_BYTES = 
+        ByteChunk.convertToBytes("HTTP/1.1 100 Continue\r\n\r\n");
+    
+    static ByteChunk CLOSE = ByteChunk.fromString("close");
+    static ByteChunk KEEPALIVE = ByteChunk.fromString("keep-alive");
+    public static final String HTTP_11 = "HTTP/1.1";
+    public static final String HTTP_10 = "HTTP/1.0";
+    public static final String GET = "GET";
+    public static final String POST = "POST";
+
+    
+    // Empty string to not send identification
+    String serverHeader = "ApacheTomcat";
+    AsyncProtocolHandler simpleProtocolHandler;
+
+    boolean http11 = false;
+    boolean http09 = false;
+    boolean error = false;
+    // don't return to keep alive.
+    boolean comet = false;
+    boolean cometClose = false;
+    
+    // TODO: use CharChunk
+    protected char[] hostNameC = new char[0];
+    boolean ssl;
+    BlockingCoyoteBuffers blockingIO = new BlockingCoyoteBuffers(this);
+
+    /**
+     * Implements blocking write and buffering of the response.
+     * Uses the 'output' ByteBuffer. 
+     * 
+     * Since the socket is non-blocking, there are few special things:
+     * - flush() and endRequest() will cause the callback to register for WRITE
+     * - write will keep sending data until the buffer is empty. 
+     * - flush() will wait for the buffer to be empty, endRequest will not.
+     * - when the buffer is full, a flush() will be called
+     * - TODO: it should be possible to add to the buffer while data is 
+     * written - tricky for sync
+     * 
+     * A comet request could bypass the OutputBuffer and work directly
+     * with the selector and the real buffers, this is just a wrapper.
+     */
+
+    public AsyncHttpProcessor(AsyncProtocolHandler simpleProtocolHandler) {
+        super(null, true);
+        this.simpleProtocolHandler = simpleProtocolHandler;
+        res.setHook(this);
+        res.setOutputBuffer(blockingIO.outputBuffer);
+        req.setInputBuffer(blockingIO.inputBuffer);
+        debug = true;
+    }
+
+
+    public void recycle() {
+        super.recycle();
+        http11 = false;
+        http09 = false;
+        error = false;
+        comet = false;
+        cometClose = false;
+        allDone = false;
+    }
+    
+    // Call the servlet...
+    @Override
+    protected void headersReceived() throws IOException {
+        processReadTransferHeaders(req.getMimeHeaders());
+
+        if (!isReadContentDelimited()) {
+            if (req.method().equals("GET") ||
+                    req.method().equalsIgnoreCase("HEAD")) {
+                // No body.
+                handleEndReceive(false);
+            } else {
+                closeOnEndSend = true;
+                closeOnEndRecv = true; // needBody
+            }
+        } // else: delimited, need body
+        
+        // TODO: map, detect 'async' handlers
+        if (httpCallback != null) {
+            httpCallback.headersReceived(this, req, res);
+        }
+        
+        simpleProtocolHandler.tp.execute(this);
+    }
+
+    @Override
+    protected int parseHead() throws IOException {
+        boolean ok = parser.parseRequestLine(req.method(),
+                req.unparsedURI(), req.query(), 
+                req.requestURI(), req.protocol());
+        if (!ok) {
+            return NEED_MORE;
+        }
+
+        int dataStart = parser.parseHeaders(req.getMimeHeaders());
+        return (dataStart < 0) ? NEED_MORE : dataStart;
+    }
+
+    /** 
+     * This part is run in the thread pool, after headers received.
+     */
+    public void run() {
+        try {
+            // Make ourself available to comet requests.
+            req.setNote(CoyoteServer.COMET_REQ_NOTE, this);
+            prepareRequest();
+
+            String uri = req.unparsedURI().toString(); // will be recycled
+            if (debug ) {
+                log.info("Server: SERVICE START ---- " + uri);
+            }
+            
+            try {
+                this.simpleProtocolHandler.adapter.service(req, res);
+                // at this point the response is in process of sending or sent
+                // the client may already start pipeline-ing the next req.
+            } catch (IOException ex) {
+                if ("Broken pipe".equals(ex.getMessage())) { 
+                    log.warning("Connection interrupted while writting");
+                }
+            } catch (Throwable t) {
+                t.printStackTrace();
+            }
+
+            if (res.getNote(CoyoteServer.COMET_RES_NOTE) == null) {
+                // request done - let's return this processor
+                // TODO: special pool !! release(); // by the time 
updateCallback ends, we may start 
+                // receiving the next request already
+                log.info("Server: SERVICE DONE: ---- " + uri);
+                release();
+            } else {
+                log.info("Server: coyote async req: ---- " + uri);
+                // Comet - don't release
+            }
+        } catch (IOException ex) {
+            if ("Broken pipe".equals(ex.getMessage())) { 
+                log.warning("Connection interrupted while writting");
+            } else {
+                ex.printStackTrace();
+            }
+        } catch (Throwable t) {
+            // TODO Auto-generated catch block
+            t.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void handleEndSendReceive(SelectorData selT) 
+            throws IOException {
+        // Called when both send and receive are done. Either from IO or 
+        // regular thread.
+        if (httpCallback != null) {
+            httpCallback.done(this, false, null);
+        }
+        if (closeOnEndSend) {
+            selT.sel.close(selT);
+            log.info("server send/receive done, close=" + closeOnEndSend);
+            maybeRelease();                
+        } else {
+            // If write is not done in the IO thread, we may have the last
+            // packet and endSendReceive called in the thread pool.
+            // If this happens, read can happen while we are doing this method.
+            maybeRelease();
+        }
+    }
+    
+    
+    /** 
+     * Finalize the sending. Indicates servlet processing is done, some
+     * IO may still be in flight.
+     *  
+     * MUST be called -  either after service or from the commet app.
+     * 
+     * The tricky part is if this gets called before or after or at same 
+     * time with endSendReceive and with next request getting pipelined.
+     * 
+     * @throws IOException 
+     */
+    public void release() throws IOException {
+        endSend(this); // make sure the final marker is sent.
+        serviceDone = true;
+        maybeRelease();
+    }
+    
+    @Override
+    protected void handleReceivePastEnd(SelectorData selT) throws IOException {
+    }
+    
+    @Override
+    public void ioThreadRun(SelectorData selT) throws IOException {
+        // Currently used only to set keep alive.
+        // If we need more, need to use state or pass a param.
+        
+        if (rawRecvBuf.length() > 0) {
+            // what if I'm not done ?
+            if (debug) {
+                log.info("Done, pipelined next request \n" + rawRecvBuf + " " 
+                    + rawRecvBuf.length());
+            }
+            headRecvBuf.recycle();
+            headRecvBuf.append(rawRecvBuf);
+            recycle(true);
+            // No release - next req in process
+            
+            // In case it was disabled
+            selector.readInterest(selT, true);
+            // Notify that data was received. The callback must deal with
+            // pre-existing data.
+            selectorCallback.dataReceived(selT);
+            return;
+        }
+        
+        synchronized (this) {
+            if (keepAliveCallback == null) {
+                keepAliveCallback = 
simpleProtocolHandler.getKeepAliveCallback();
+             }
+             selector.readInterest(selT, true);
+             
+             selector.updateCallback(selT, this.selectorCallback, 
keepAliveCallback);
+             
+             if (debug) {
+                 log.info("SERVER send/receive done, KEEP_ALIVE");
+             }
+        }
+        
+        recycle();
+        returnToPool();
+    }
+    
+    public void fromKeepAlive(SelectorData selThread, SelectorCallback ka)
+            throws IOException {
+        synchronized (this) {
+            keepAliveCallback = ka;
+             
+            selThread.sel.updateCallback(selThread, ka, selectorCallback);
+        }
+        try {
+            selectorCallback.dataReceived(selThread);
+        } catch (IOException ex) {
+            selThread.sel.close(selThread);
+        }
+    }
+    
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        
sb.append("ServerHttpProcessor-").append(ser).append(",rs=").append(recvState)
+            .append(",ss=");
+        sb.append(sendState).append(")");
+        return sb.toString();
+    }
+    
+    private String[] actionNames = {
+            "", "ACK", "CLOSE", "COMMIT", "CLIENT_FLUSH", "CUSTOM",
+            "RESET", "START", "STOP" /*8*/, 
+            "WEBAPP", "POST_REQ", "REQ_HOST_ATTRIBUTE", 
+            "REQ_HOST_ADDR_ATTRIBUTE", "REQ_SSL_ATTRIBUTE", 
+            "NEW_REQ", "REQ_SSL_CERT", "REQ_REMOTEPORT", 
+            "REQ_LOCALPORT",
+    };
+    
+    public void action(ActionCode actionCode, Object param) {
+        if (debug) {
+            log.info("ServerHttpProcessor: Action: " + actionCode.getCode()
+                    + " " + ((actionCode.getCode() < actionNames.length) ? 
+                actionNames[actionCode.getCode()] : "") );
+        }
+        if (actionCode == ActionCode.ACTION_COMMIT) {
+            maybeCommitResponse();
+            return;
+        } else if (actionCode == ActionCode.ACTION_ACK) {
+            send100Continue();
+        } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) {
+            maybeCommitResponse();
+            try {
+                blockingIO.outputBuffer.flush();
+            } catch (IOException e) {
+                // Set error flag
+                error = true;
+                res.setErrorException(e);
+            }
+        } else if (actionCode == ActionCode.ACTION_CLOSE) {
+            // Close - from commet or post request == res.finish()
+            // End the processing of the current request, and stop any further
+            // transactions with the client
+
+            comet = false;
+            cometClose = true;
+
+            try {
+                blockingIO.outputBuffer.endRequest();
+            } catch (IOException e) {
+                // Set error flag
+                error = true;
+            }
+        } else if (actionCode == ActionCode.ACTION_RESET) {
+            // Reset response
+            // Note: This must be called before the response is committed
+            blockingIO.outputBuffer.reset();
+        } else if (actionCode == ActionCode.ACTION_CUSTOM) {
+            // Do nothing
+        } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) {
+//          ByteChunk body = (ByteChunk) param;
+
+//          InputFilter savedBody = new SavedRequestInputFilter(body);
+//          savedBody.setRequest(request);
+
+//          InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer)
+//          request.getInputBuffer();
+//          internalBuffer.addActiveFilter(savedBody);
+
+        } else if (actionCode == ActionCode.ACTION_AVAILABLE) {
+          req.setAvailable(blockingHttp.available());
+        } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
+            res.setNote(CoyoteServer.COMET_RES_NOTE, res);
+            comet = true;
+        } else if (actionCode == ActionCode.ACTION_COMET_END) {
+            comet = false;
+        }  // --------------------- Address/port --------------------------    
            
+             else if (actionCode == 
ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {
+
+            InetAddress addr = 
+                selectorData.sel.getAddress(selectorData, false);
+
+            String host = addr.getHostName();
+            req.localName().setString((host == null) ? 
+                    addr.getHostAddress() : host);
+
+        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {
+            InetAddress addr = 
+                selectorData.sel.getAddress(selectorData, true);
+            String host = addr.getHostName();
+            req.remoteHost().setString((host == null) ? 
+                    addr.getHostAddress() : host);
+            // old code had checks for addr == null
+
+        } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {
+            InetAddress localAddr = 
+                selectorData.sel.getAddress(selectorData, true);
+            req.remoteAddr().setString(localAddr.getHostAddress());
+
+        } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {
+            // TODO: cache it ? 
+            InetAddress localAddr = 
+                selectorData.sel.getAddress(selectorData, false);
+            req.localAddr().setString(localAddr.getHostAddress());
+
+        } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {
+            int remotePort = 
+                selectorData.sel.getPort(selectorData, true);
+            req.setRemotePort(remotePort);
+
+        } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {
+            int port = 
+                selectorData.sel.getPort(selectorData, false);
+            req.setLocalPort(port);
+
+        // --------------------- SSL --------------------------    
+        } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {
+
+//          try {
+//          if (sslSupport != null) {
+//          Object sslO = sslSupport.getCipherSuite();
+//          if (sslO != null)
+//          request.setAttribute
+//          (SSLSupport.CIPHER_SUITE_KEY, sslO);
+//          sslO = sslSupport.getPeerCertificateChain(false);
+//          if (sslO != null)
+//          request.setAttribute
+//          (SSLSupport.CERTIFICATE_KEY, sslO);
+//          sslO = sslSupport.getKeySize();
+//          if (sslO != null)
+//          request.setAttribute
+//          (SSLSupport.KEY_SIZE_KEY, sslO);
+//          sslO = sslSupport.getSessionId();
+//          if (sslO != null)
+//          request.setAttribute
+//          (SSLSupport.SESSION_ID_KEY, sslO);
+//          }
+//          } catch (Exception e) {
+//          log.warn(sm.getString("http11processor.socket.ssl"), e);
+//          }
+
+        } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) {
+
+//          if( sslSupport != null) {
+//          /*
+//          * Consume and buffer the request body, so that it does not
+//          * interfere with the client's handshake messages
+//          */
+//          InputFilter[] inputFilters = inputBuffer.getFilters();
+//          ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER])
+//          .setLimit(maxSavePostSize);
+//          inputBuffer.addActiveFilter
+//          (inputFilters[Constants.BUFFERED_FILTER]);
+//          try {
+//          Object sslO = sslSupport.getPeerCertificateChain(true);
+//          if( sslO != null) {
+//          request.setAttribute
+//          (SSLSupport.CERTIFICATE_KEY, sslO);
+//          }
+//          } catch (Exception e) {
+//          log.warn(sm.getString("http11processor.socket.ssl"), e);
+//          }
+//          }
+
+        }
+
+    }
+
+    /**
+     *  Acknowlege request ( Expect: 100 Continue )
+     */
+    public void send100Continue() {
+        if ((res.isCommitted()) || !expectation)
+            return;
+        expectation = false;
+        ByteChunk bc = new ByteChunk();
+        bc.setBytes(ACK_BYTES, 0, ACK_BYTES.length);
+        try {
+            startWrite(bc);
+        } catch (IOException e) {
+            error = true;
+        }
+    }
+
+    public void maybeCommitResponse() {
+        // Commit current response
+        if (res.isCommitted())
+            return;
+
+        // Validate and write response headers
+        try {
+            sendResponseHeaders();
+        } catch (IOException e) {
+            // Set error flag
+            error = true;
+        }
+    }
+
+    /**
+     * After reading the request headers, we have to setup the request filters.
+     */
+    protected void prepareRequest() {
+
+        http11 = true;
+        http09 = false;
+        contentDelimitationRecv = false;
+        expectation = false;
+//      sendfileData = null;
+//      if (ssl) {
+//      req.scheme().setString("https");
+//      }
+        MessageBytes protocolMB = req.protocol();
+        if (protocolMB.equals(HTTP_11)) {
+            http11 = true;
+            protocolMB.setString(HTTP_11);
+        } else if (protocolMB.equals(HTTP_10)) {
+            http11 = false;
+            closeOnEndSend = true;
+            protocolMB.setString(HTTP_10);
+        } else if (protocolMB.equals("")) {
+            // HTTP/0.9
+            http09 = true;
+            http11 = false;
+            closeOnEndSend = true;
+        } else {
+            // Unsupported protocol
+            http11 = false;
+            error = true;
+            // Send 505; Unsupported HTTP version
+            res.setStatus(505);
+        }
+
+        MessageBytes methodMB = req.method();
+        if (methodMB.equals(GET)) {
+            methodMB.setString(GET);
+        } else if (methodMB.equals(POST)) {
+            methodMB.setString(POST);
+        }
+
+        MimeHeaders headers = req.getMimeHeaders();
+
+        // Check connection header
+        MessageBytes connectionValueMB = headers.getValue("connection");
+        if (connectionValueMB != null) {
+            ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
+            if (connectionValueBC.indexOfIngoreCase(CLOSE, 0) != -1) {
+                closeOnEndSend = true;
+            } else if (connectionValueBC.indexOfIngoreCase(KEEPALIVE, 0) != 
-1) {
+                closeOnEndSend = false;
+            }
+        }
+
+        MessageBytes expectMB = null;
+        if (http11) {
+            expectMB = headers.getValue("expect");
+        }
+        if ((expectMB != null)
+                && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) {
+            expectation = true;
+        }
+
+        // Check user-agent header
+//      if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) {
+//      MessageBytes userAgentValueMB = headers.getValue("user-agent");
+//      // Check in the restricted list, and adjust the http11
+//      // and keepAlive flags accordingly
+//      if(userAgentValueMB != null) {
+//      String userAgentValue = userAgentValueMB.toString();
+//      for (int i = 0; i < restrictedUserAgents.length; i++) {
+//      if (restrictedUserAgents[i].matcher(userAgentValue).matches()) {
+//      http11 = false;
+//      keepAlive = false;
+//      break;
+//      }
+//      }
+//      }
+//      }
+
+        // Check for a full URI (including protocol://host:port/)
+        ByteChunk uriBC = req.requestURI().getByteChunk();
+        if (uriBC.startsWithIgnoreCase("http", 0)) {
+
+            int pos = uriBC.indexOf("://", 0, 3, 4);
+            int uriBCStart = uriBC.getStart();
+            int slashPos = -1;
+            if (pos != -1) {
+                byte[] uriB = uriBC.getBytes();
+                slashPos = uriBC.indexOf('/', pos + 3);
+                if (slashPos == -1) {
+                    slashPos = uriBC.getLength();
+                    // Set URI as "/"
+                    req.requestURI().setBytes
+                    (uriB, uriBCStart + pos + 1, 1);
+                } else {
+                    req.requestURI().setBytes
+                    (uriB, uriBCStart + slashPos,
+                            uriBC.getLength() - slashPos);
+                }
+                MessageBytes hostMB = headers.setValue("host");
+                hostMB.setBytes(uriB, uriBCStart + pos + 3,
+                        slashPos - pos - 3);
+            }
+
+        }
+
+        // Parse transfer-encoding header
+        processReadTransferHeaders(headers);
+
+        MessageBytes valueMB = headers.getValue("host");
+        // Check host header
+        if (http11 && (valueMB == null)) {
+            error = true;
+            // 400 - Bad request
+            res.setStatus(400);
+        }
+
+        parseHost(valueMB);
+
+        // Advertise sendfile support through a request attribute
+//      if (endpoint.getUseSendfile()) 
+//      request.setAttribute("org.apache.tomcat.sendfile.support", 
Boolean.TRUE);
+//      // Advertise comet support through a request attribute
+//      request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);
+//      // Advertise comet timeout support
+//      request.setAttribute("org.apache.tomcat.comet.timeout.support", 
Boolean.TRUE);
+
+    }
+
+
+    /**
+     * Parse host.
+     */
+    public void parseHost(MessageBytes valueMB) {
+
+        if (valueMB == null || valueMB.isNull()) {
+            // HTTP/1.0
+            // Default is what the socket tells us. Overriden if a host is
+            // found/parsed
+//          req.setServerPort(endpoint.getPort());
+            return;
+        }
+
+        ByteChunk valueBC = valueMB.getByteChunk();
+        byte[] valueB = valueBC.getBytes();
+        int valueL = valueBC.getLength();
+        int valueS = valueBC.getStart();
+        int colonPos = -1;
+        if (hostNameC.length < valueL) {
+            hostNameC = new char[valueL];
+        }
+
+        boolean ipv6 = (valueB[valueS] == '[');
+        boolean bracketClosed = false;
+        for (int i = 0; i < valueL; i++) {
+            char b = (char) valueB[i + valueS];
+            hostNameC[i] = b;
+            if (b == ']') {
+                bracketClosed = true;
+            } else if (b == ':') {
+                if (!ipv6 || bracketClosed) {
+                    colonPos = i;
+                    break;
+                }
+            }
+        }
+
+        if (colonPos < 0) {
+            if (!ssl) {
+                // 80 - Default HTTP port
+                req.setServerPort(80);
+            } else {
+                // 443 - Default HTTPS port
+                req.setServerPort(443);
+            }
+            req.serverName().setChars(hostNameC, 0, valueL);
+        } else {
+
+            req.serverName().setChars(hostNameC, 0, colonPos);
+
+            int port = 0;
+            int mult = 1;
+            for (int i = valueL - 1; i > colonPos; i--) {
+                int charValue = HexUtils.DEC[(int) valueB[i + valueS]];
+                if (charValue == -1) {
+                    // Invalid character
+                    error = true;
+                    // 400 - Bad request
+                    res.setStatus(400);
+                    break;
+                }
+                port = port + (charValue * mult);
+                mult = 10 * mult;
+            }
+            req.setServerPort(port);
+
+        }
+
+    }
+
+    /** 
+     * Called after the headers are sent.
+     * 
+     */
+    @Override
+    public void headersSent() throws IOException {
+        sendState = State.BODY_DATA;      
+        int statusCode = res.getStatus();
+        if ((statusCode == 204) || (statusCode == 205)
+                || (statusCode == 304)) {
+            noBodySend = true;
+        }
+
+        MessageBytes methodMB = req.method();
+        if (methodMB.equals("HEAD")) {
+            // No entity body
+            noBodySend = true;
+        }
+        
+    }
+
+    /**
+     * When committing the response, we have to validate the set of headers, as
+     * well as setup the response filters.
+     */
+    public void sendResponseHeaders() throws IOException {
+
+        boolean entityBody = true;
+        contentDelimitationRecv = false;
+
+        //List<OutputFilter> outputFilters = outputBuffer.getFilters();
+
+        if (http09 == true) {
+            // HTTP/0.9 - no headers
+            closeOnEndSend = true;
+            return;
+        }
+
+        int statusCode = res.getStatus();
+        if ((statusCode == 204) || (statusCode == 205)
+                || (statusCode == 304)) {
+            entityBody = false;
+            contentDelimitationRecv = true;
+        }
+
+        MessageBytes methodMB = req.method();
+        if (methodMB.equals("HEAD")) {
+            // No entity body
+            contentDelimitationRecv = true;
+        }
+
+//      // Sendfile support
+//      if (this.endpoint.getUseSendfile()) {
+//      String fileName = (String) 
request.getAttribute("org.apache.tomcat.sendfile.filename");
+//      if (fileName != null) {
+//      // No entity body sent here
+//      outputBuffer.addActiveFilter(outputFilters[Constants.VOID_FILTER]);
+//      contentDelimitation = true;
+//      sendfileData = new NioEndpoint.SendfileData();
+//      sendfileData.fileName = fileName;
+//      sendfileData.pos = ((Long) 
request.getAttribute("org.apache.tomcat.sendfile.start")).longValue();
+//      sendfileData.length = ((Long) 
request.getAttribute("org.apache.tomcat.sendfile.end")).longValue() - 
sendfileData.pos;
+//      }
+//      }
+
+        // Check for compression
+//      boolean useCompression = false;
+//      if (entityBody && (compressionLevel > 0) && (sendfileData == null)) {
+//      useCompression = isCompressable();
+//      // Change content-length to -1 to force chunking
+//      if (useCompression) {
+//      response.setContentLength(-1);
+//      }
+//      }
+
+        MimeHeaders headers = res.getMimeHeaders();
+        if (!entityBody) {
+            res.setContentLength(-1);
+        } else {
+            String contentType = res.getContentType();
+            if (contentType != null) {
+                headers.setValue("Content-Type").setString(contentType);
+            }
+            String contentLanguage = res.getContentLanguage();
+            if (contentLanguage != null) {
+                headers.setValue("Content-Language")
+                .setString(contentLanguage);
+            }
+        }
+
+        long contentLength = res.getContentLengthLong();
+        if (contentLength != -1) {
+            headers.setValue("Content-Length").setLong(contentLength);
+            remainingSend = contentLength;
+            contentDelimitationRecv = true;
+        } else {
+            if (entityBody && http11 && !closeOnEndSend) {
+                chunkedSend = true;
+                contentDelimitationRecv = true;
+                
headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
+            } else {
+                closeOnEndSend = true;
+            }
+        }
+
+//      if (useCompression) {
+//      outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
+//      headers.setValue("Content-Encoding").setString("gzip");
+//      // Make Proxies happy via Vary (from mod_deflate)
+//      headers.setValue("Vary").setString("Accept-Encoding");
+//      }
+
+        // Add date header
+        
headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate());
+
+        // FIXME: Add transfer encoding header
+
+        if ((entityBody) && (!contentDelimitationRecv)) {
+            // Mark as close the connection after the request, and add the
+            // connection: close header
+            closeOnEndSend = true;
+        }
+
+        // If we know that the request is bad this early, add the
+        // Connection: close header.
+        closeOnEndSend = closeOnEndSend || statusDropsConnection(statusCode);
+        if (closeOnEndSend) {
+            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
+        } else if (!http11 && !error) {
+            
headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
+        }
+
+        // Add server header
+        if (serverHeader.length() > 0) {
+            headers.setValue("Server").setString(serverHeader);
+        }
+
+        serializeResponse(res, headSendBuf);
+
+        startWrite(headSendBuf);
+    }
+
+    SelectorCallback getCallback() {
+        return selectorCallback;
+    }
+
+}
\ No newline at end of file

Propchange: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncHttpProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java?rev=667815&view=auto
==============================================================================
--- 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
 (added)
+++ 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
 Sat Jun 14 08:30:48 2008
@@ -0,0 +1,180 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.coyote.Adapter;
+import org.apache.coyote.ProtocolHandler;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.client.AsyncHttpPool;
+import org.apache.tomcat.util.net.SelectorCallback;
+import org.apache.tomcat.util.net.SelectorPool;
+import org.apache.tomcat.util.net.SelectorThread.SelectorData;
+
+/**
+ * New protocol handler using SelectorThread to abstract NIO and
+ * APR, with more flexibility on processing async requests ( coyote++)
+ * 
+ * The second goal is to keep it as simple as possible, and reuse 
+ * code with the async http client. 
+ * 
+ * 
+ * @author Costin Manolache
+ */
+public class AsyncProtocolHandler implements ProtocolHandler {
+    public static final String A_DAEMON = "daemon";
+    public static final String A_PORT = "port";
+    Adapter adapter;
+    Map<String, Object> attributes = new HashMap<String, Object>();
+    AsyncHttpPool serverPool = new AsyncHttpPool("Server", null) {
+        @Override
+        protected AsyncHttp create() {
+            return new AsyncHttpProcessor(AsyncProtocolHandler.this);
+        }
+    };
+    Executor tp = Executors.newCachedThreadPool();
+
+    public void destroy() throws Exception {
+    }
+
+    public Adapter getAdapter() {
+        return adapter;
+    }
+
+    public Object getAttribute(String name) {
+        return attributes.get(name);
+    }
+
+    public Iterator getAttributeNames() {
+        return attributes.keySet().iterator();
+    }
+
+    public void setAttribute(String name, Object value) {
+        attributes.put(name, value);
+    }
+
+    public void setAdapter(Adapter adapter) {
+        this.adapter = adapter;
+    }
+
+    public void init() throws Exception {
+    }
+
+    public void pause() throws Exception {
+    }
+
+    public void resume() throws Exception {
+    }
+
+    public void setPort(int port) {
+        setAttribute(A_PORT, Integer.toString(port));
+    }
+
+    public void setDaemon(boolean t) {
+        if (t) {
+            setAttribute("daemon", "1");
+        }
+    }
+    
+    public void start() throws Exception {
+        SelectorPool pool = SelectorPool.defaultPool();
+        pool.setDaemon(getAttribute(A_DAEMON) != null);
+        String portS = (String) getAttribute(A_PORT);
+        int port = (portS == null) ? 8080 : Integer.parseInt(portS);
+        if (port == -1) {
+            // TODO: find an unused port
+        }
+        if (port == 0) {
+            pool.getSelector().inetdAcceptor(new AcceptorSelectorCallback()); 
+        } else { 
+            pool.getSelector().acceptor(new AcceptorSelectorCallback(), 
+                    port, null, 100, 2000);
+        }
+    }
+
+    public SelectorCallback getKeepAliveCallback() {
+        return new KeepAliveCallback();
+    }
+    
+    /**
+     *  Callback for requests waiting in keep-alive.
+     *  Minimal state - no buffers or request objects.
+     *  
+     *  TODO: we may use a small buffer to get the head before
+     *  dispatching.
+     */
+    public class KeepAliveCallback extends SelectorCallback {
+        // TODO: keep headBuffer here, wait until the header is received.
+        // There is no reason to create the whole AsyncHttp before we have 
+        // the request 
+
+        /**
+         * Data available for read.
+         * If you consume all data ( readNB() returns 0 ), you can re-enable 
this
+         * callback. 
+         */
+        @Override
+        public void dataReceived(SelectorData selThread)
+                throws IOException {
+            AsyncHttp shttp = serverPool.get();
+            if (shttp == null) {
+                shttp = new AsyncHttpProcessor(AsyncProtocolHandler.this);
+                shttp.setClientPool(serverPool);
+            }
+            ((AsyncHttpProcessor) shttp).fromKeepAlive(selThread, this);
+        }
+
+
+        /** 
+         * Close was detected.  
+         */
+        @Override
+        public void channelClosed(SelectorData selThread, Throwable ex) {
+            if (ex != null ) {
+                System.err.println("Closed");
+                ex.printStackTrace();
+            }
+        }    
+
+    }  
+
+    /**
+     * When a connection is accepted: 
+     * - try to parse a HTTP request
+     * - when we have the head, dispatch the processing - first 
+     * to async handlers, then to a thread pool. This part is 
+     * shared with the 'keep-alive' handler 
+     * 
+     * @author Costin Manolache
+     */
+    public class AcceptorSelectorCallback extends SelectorCallback {
+        @Override
+        public SelectorCallback connectionAccepted(SelectorData selThread, 
+                                                   Channel sockC) {
+            AsyncHttpProcessor shttp = (AsyncHttpProcessor) serverPool.get();
+            shttp.setSelectorData(selThread);
+            return shttp.getCallback();
+        }
+    } 
+
+}

Propchange: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/AsyncProtocolHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
URL: 
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java?rev=667815&view=auto
==============================================================================
--- 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
 (added)
+++ 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
 Sat Jun 14 08:30:48 2008
@@ -0,0 +1,296 @@
+/*  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.coyote.InputBuffer;
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Request;
+import org.apache.coyote.Response;
+import org.apache.coyote.client.AsyncHttp;
+import org.apache.coyote.client.AsyncHttpCallback;
+import org.apache.coyote.client.BlockingHttp;
+import org.apache.coyote.http11.InputFilter;
+import org.apache.coyote.http11.OutputFilter;
+import org.apache.coyote.http11.filters.GzipInputFilter;
+import org.apache.coyote.http11.filters.GzipOutputFilter;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+/**
+ * Support for blocking coyote Input/Output Filter and Buffer. 
+ * 
+ * The client implements basic transfer encoding directly, non-blocking - this
+ * can be used to support compression filters ( content-encoding ) and custom
+ * filters. It is also needed to integrated with tomcat.
+ * 
+ * The main problem with this class is the data copy - between buffers.  
+ * 
+ * @author Costin Manolache
+ */
+class BlockingCoyoteBuffers {
+
+    AsyncHttp aprocessor;
+    BlockingHttp blockingHttp;
+    
+    // Tansfer-decoded, blocking input buffer 
+    InputBuffer rawReadBuf;
+    // Tansfer-decoded, blocking output buffer 
+    OutputBuffer rawWriteBuf;
+        
+    // Blocking, content filtered buffers
+    FilteredInputBuffer inputBuffer;
+    FilteredOutputBuffer outputBuffer;
+    
+    // this will have to be blocking for now
+    List<String> readEncodings = new ArrayList();
+    List<String> writeEncodings = new ArrayList();
+
+    public BlockingCoyoteBuffers(AsyncHttp aprocessor) {
+        this.aprocessor = aprocessor;
+        this.blockingHttp = aprocessor.getBlockingHttp();
+        rawReadBuf = new SelectorInputBuffer();
+        rawWriteBuf = new SelectorOutputBuffer();
+        inputBuffer = new FilteredInputBuffer(rawReadBuf);
+        outputBuffer = new FilteredOutputBuffer(rawWriteBuf);
+    
+    }
+
+    public void recycle() {
+        inputBuffer.recycle();
+        readEncodings.clear();
+    }
+
+    public List<String> getEncodings() {
+        return readEncodings;
+    }
+    
+
+    /**
+     * Initialize standard input and output filters.
+     */
+    protected void initializeFilters() {
+        // Create and add the chunked filters.
+        inputBuffer.addFilter(new GzipInputFilter());
+        outputBuffer.addFilter(new GzipOutputFilter());
+    }
+
+    // ----------- Implementation classes -------------- 
+
+    /** 
+     * InputBuffer interface, will use filters to decode. 
+     */
+    protected static class FilteredInputBuffer implements InputBuffer {
+
+        InputBuffer rawBuffer; // needed for filters
+
+        protected List<InputFilter> filterLibrary = new ArrayList();
+        protected List<InputFilter> activeFilters = new ArrayList();
+
+        public FilteredInputBuffer(InputBuffer rawBuffer) {
+            this.rawBuffer = rawBuffer;
+        }
+
+        public void recycle() {
+            activeFilters.clear();
+        }
+
+        public int doRead(ByteChunk chunk, Request request) 
+                throws IOException {
+            if (activeFilters.size() == 0) {
+                return rawBuffer.doRead(chunk, request);
+            } else {
+                return activeFilters.get(activeFilters.size() - 
1).doRead(chunk, request);
+            }
+        }
+
+        public void setSwallowInput(boolean b) {
+        }
+
+        public List<InputFilter> getFilters() {
+            return filterLibrary;
+        }
+
+        /**
+         * Add an input filter to the filter library.
+         */
+        public void addActiveFilter(InputFilter filter) {
+            if (activeFilters.size() == 0) {
+                filter.setBuffer(rawBuffer);
+            } else {
+                if (activeFilters.contains(filter)) {
+                    return;
+                }
+                filter.setBuffer(activeFilters.get(activeFilters.size() - 1));
+            }
+            activeFilters.add(filter);
+        }
+
+        public void addFilter(InputFilter filter) {
+            if (filterLibrary.contains(filter)) {
+                return;
+            }
+            filterLibrary.add(filter);
+        }
+
+        public boolean addActiveFilter(String name) {
+            for (InputFilter f : filterLibrary) {
+                if (f.getEncodingName().toString().equals(name)) {
+                    addActiveFilter(f);
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    }
+
+    /** 
+     * InputBuffer interface, will use filters to decode. 
+     */
+    public class FilteredOutputBuffer implements OutputBuffer, 
+            AsyncHttpCallback.AsyncHttpHolder {
+
+        OutputBuffer rawBuffer; // needed for filters
+
+        protected List<OutputFilter> filterLibrary = new ArrayList();
+        protected List<OutputFilter> activeFilters = new ArrayList();
+
+        public FilteredOutputBuffer(OutputBuffer rawBuffer) {
+            this.rawBuffer = rawBuffer;
+        }
+
+        public void recycle() {
+            activeFilters.clear();
+        }
+
+        /** 
+         * Blocking write. 
+         * @param chunk contains data, will not be modified
+         */
+        public int doWrite(ByteChunk chunk, Response res) 
+        throws IOException {
+          if (!res.isCommitted()) {
+              res.sendHeaders(); // process headers, serialize head
+          } 
+
+            if (activeFilters.size() == 0) {
+                return rawBuffer.doWrite(chunk, res);
+            } else {
+                return activeFilters.get(activeFilters.size() - 
1).doWrite(chunk, res);
+            }
+        }
+
+        public List<OutputFilter> getFilters() {
+            return filterLibrary;
+        }
+
+//      public void addFilter(OutputFilter f) {
+//      filterLibrary.add(f);
+//      }
+
+        /**
+         * Add an Output filter to the filter library.
+         */
+        public void addActiveFilter(OutputFilter filter) {
+            if (activeFilters.size() == 0) {
+                filter.setBuffer(rawBuffer);
+            } else {
+                if (activeFilters.contains(filter)) {
+                    return;
+                }
+                filter.setBuffer(activeFilters.get(activeFilters.size() - 1));
+            }
+            activeFilters.add(filter);
+        }
+
+        public void addFilter(OutputFilter filter) {
+            if (filterLibrary.contains(filter)) {
+                return;
+            }
+            filterLibrary.add(filter);
+        }
+
+        public boolean addActiveFilter(String name) {
+            for (OutputFilter f : filterLibrary) {
+                if (f.getEncodingName().toString().equals(name)) {
+                    addActiveFilter(f);
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public void flush() throws IOException {
+
+        }
+
+        public void sendAck() throws IOException {
+        }
+
+        public void endRequest() throws IOException {
+            if (activeFilters.size() > 0) {
+                activeFilters.get(activeFilters.size() - 1).end();
+            }
+        }
+
+        public void reset() {
+        }
+
+        public AsyncHttp getAsyncHttp() {
+            return aprocessor;
+        }
+
+    }
+
+    /** 
+     * BLOCKING InputBuffer for the raw data received.
+     * 
+     * Since tomcat buffer is blocking, this method will block
+     * as well until more data is received, if the input is empty.
+     *   
+     */
+    class SelectorInputBuffer implements InputBuffer {
+
+        // Blocking read method
+        public int doRead(ByteChunk chunk, Request request) 
+                throws IOException {
+            ByteChunk next = blockingHttp.readBlocking(0);
+            if (next == null) {
+                return -1;
+            }
+            chunk.setBytes(next.getBuffer(), next.getStart(), next.length());
+            return chunk.length();
+        }
+    }
+    
+
+    class SelectorOutputBuffer implements OutputBuffer {
+        
+        /** 
+         * Blocking write. 
+         * @param chunk contains data, will not be modified
+         */
+        public int doWrite(ByteChunk chunk, Response res) 
+                throws IOException {
+            return blockingHttp.doWrite(chunk);
+        }
+    }
+
+    
+}
\ No newline at end of file

Propchange: 
tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/coyote/http11/async/BlockingCoyoteBuffers.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to