Author: rhuijben
Date: Mon Nov 16 10:47:39 2015
New Revision: 1714546

URL: http://svn.apache.org/viewvc?rev=1714546&view=rev
Log:
Following up on r1714449, extend the incoming client support to basically
support incoming http/1 requests. This makes the test added in the previous
revision work.

* deprecated.c
  New file, providing backwards compatibility for the severyly limited
  serf_incoming_create() function.

* incoming.c
  (client_connected): Setup protocol detection.
  (destroy_request): New function.
  (response_finished): New function.
  (serf_incoming_response_create): New public function.
  (perform_peek_protocol): New function.
  (read_from_client): Add simple http/1 style implementation.

  (socket_writev,
   no_more_writes,
   serf__client_flush): New functions. Based on their outgoing variants.
  (write_to_client): Implement writing via the previous 3 new functions.
  (serf__process_listener): Set socket to not blocking.
  (serf_incoming_create2): Tweak arguments. Initialize some values. Store
    client in context to allow pollset updating.

  (ic_setup_baton_t,
   dummy_closed,
   serf_incoming_create): Move to deprecated.c.
  (serf__incoming_update_pollset): Tweak comment.

* serf.h
  (serf_incoming_request_handler_t,
   serf_incoming_response_setup_t,
   serf_incoming_request_setup_t): New typedefs.
  (serf_incoming_request_cb_t): Deprecate.
  (serf_incoming_create): Tweak comment. Deprecate.
  (serf_incoming_create2): Update argument type.
  (serf_incoming_response_create): New function.

* serf_private.h
  (serf_incoming_request_t): Define struct that had no definition before.
  (serf_incoming_t): Extend.

* test/test_server.c
  (client_request_handler,
   client_generate_response): New functions.
  (client_request_acceptor): Tweak arguments. Setup new handlers.
  (run_client_server_loop): Remove obsolete comment.
  (test_listen_http): Expect requests to actually work.

Added:
    serf/trunk/deprecated.c   (with props)
Modified:
    serf/trunk/incoming.c
    serf/trunk/serf.h
    serf/trunk/serf_private.h
    serf/trunk/test/test_server.c

Added: serf/trunk/deprecated.c
URL: http://svn.apache.org/viewvc/serf/trunk/deprecated.c?rev=1714546&view=auto
==============================================================================
--- serf/trunk/deprecated.c (added)
+++ serf/trunk/deprecated.c Mon Nov 16 10:47:39 2015
@@ -0,0 +1,146 @@
+/* ====================================================================
+ *    Licensed to the Apache Software Foundation (ASF) under one
+ *    or more contributor license agreements.  See the NOTICE file
+ *    distributed with this work for additional information
+ *    regarding copyright ownership.  The ASF licenses this file
+ *    to you under the Apache License, Version 2.0 (the
+ *    "License"); you may not use this file except in compliance
+ *    with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing,
+ *    software distributed under the License is distributed on an
+ *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *    KIND, either express or implied.  See the License for the
+ *    specific language governing permissions and limitations
+ *    under the License.
+ * ====================================================================
+ */
+
+#include <apr_pools.h>
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+#include "serf_private.h"
+
+
+/* From incoming.c */
+typedef struct ic_setup_baton_t
+{
+    serf_incoming_t *incoming;
+    serf_incoming_request_cb_t request;
+    serf_context_t *ctx;
+    void *request_baton;
+} ic_setup_baton_t;
+
+static apr_status_t dummy_setup(apr_socket_t *skt,
+                                serf_bucket_t **read_bkt,
+                                serf_bucket_t **write_bkt,
+                                void *setup_baton,
+                                apr_pool_t *pool)
+{
+    ic_setup_baton_t *isb = setup_baton;
+
+    *read_bkt = serf_bucket_socket_create(skt, isb->incoming->allocator);
+
+    return APR_SUCCESS;
+}
+
+static apr_status_t dummy_closed(serf_incoming_t *incoming,
+                                 void *closed_baton,
+                                 apr_status_t why,
+                                 apr_pool_t *pool)
+{
+    return APR_SUCCESS;
+}
+
+static apr_status_t drain_handler(serf_incoming_request_t *req,
+                                  serf_bucket_t *request,
+                                  void *baton,
+                                  apr_pool_t *pool)
+{
+    apr_status_t status;
+
+    do {
+        const char *data;
+        apr_size_t len;
+
+        status = serf_bucket_read(request, SERF_READ_ALL_AVAIL, &data, &len);
+    } while (status == APR_SUCCESS);
+
+    return status;
+}
+
+static apr_status_t response_setup(serf_bucket_t **resp_bkt,
+                                   serf_incoming_request_t *req,
+                                   void *setup_baton,
+                                   serf_bucket_alloc_t *allocator,
+                                   apr_pool_t *pool)
+{
+#define CRLF "\r\n"
+    *resp_bkt = SERF_BUCKET_SIMPLE_STRING("HTTP/1.1 200 Discarded" CRLF
+                                          "Content-Length: 25" CRLF
+                                          "Content-Type: text/plain" CRLF
+                                          CRLF
+                                          "Successfully Discarded." CRLF,
+                                          allocator);
+    return APR_SUCCESS;
+}
+
+static apr_status_t wrap_request(serf_bucket_t **req_bkt,
+                                 serf_bucket_t *stream,
+                                 serf_incoming_request_t *req,
+                                 void *request_baton,
+                                 serf_incoming_request_handler_t *handler,
+                                 void **handler_baton,
+                                 serf_incoming_response_setup_t *setup,
+                                 void **setup_baton,
+                                 apr_pool_t *pool)
+{
+    ic_setup_baton_t *isb = request_baton;
+    apr_status_t status;
+
+    status = isb->request(isb->ctx, req, isb->request_baton, pool);
+
+    if (!status) {
+        *req_bkt = serf_bucket_incoming_request_create(stream,
+                                                       stream->allocator);
+
+        *handler = drain_handler;
+        *handler_baton = isb;
+
+        *setup = response_setup;
+        *setup_baton = isb;
+    }
+
+    return status;
+}
+
+apr_status_t serf_incoming_create(
+    serf_incoming_t **client,
+    serf_context_t *ctx,
+    apr_socket_t *insock,
+    void *request_baton,
+    serf_incoming_request_cb_t request,
+    apr_pool_t *pool)
+{
+    ic_setup_baton_t *isb;
+    apr_status_t status;
+
+    isb = apr_pcalloc(pool, sizeof(*isb));
+
+    isb->ctx = ctx;
+    isb->request = request;
+    isb->request_baton = request_baton;
+
+    status = serf_incoming_create2(client, ctx, insock,
+                                   dummy_setup, isb,
+                                   dummy_closed, isb,
+                                   wrap_request, isb, pool);
+
+    if (!status)
+        isb->incoming = *client;
+
+    return status;
+}

Propchange: serf/trunk/deprecated.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: serf/trunk/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714546&r1=1714545&r2=1714546&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Mon Nov 16 10:47:39 2015
@@ -86,17 +86,396 @@ static apr_status_t client_connected(ser
     serf_bucket_aggregate_append(client->ostream_head,
                                  ostream);
 
+    client->proto_peek_bkt = serf_bucket_aggregate_create(client->allocator);
+
+    serf_bucket_aggregate_append(
+                client->proto_peek_bkt,
+                serf_bucket_barrier_create(client->stream,
+                                           client->allocator));
+
+    return status;
+}
+
+/* Destroy an incoming request and its resources */
+static apr_status_t destroy_request(serf_incoming_request_t *request)
+{
+    serf_incoming_t *incoming = request->incoming;
+    apr_pool_destroy(request->pool);
+
+    serf_bucket_mem_free(incoming->allocator, request);
+
+    return APR_SUCCESS;
+}
+
+/* Called when the response is completely written and the write bucket
+   is destroyed. Most likely the request is now 100% done */
+static apr_status_t response_finished(void *baton,
+                                      apr_uint64_t bytes_written)
+{
+    serf_incoming_request_t *request = baton;
+    apr_status_t status = APR_SUCCESS;
+
+    request->response_finished = true;
+
+    if (request->request_read && request->response_finished) {
+        status = destroy_request(request);
+    }
+    return status;
+}
+
+apr_status_t serf_incoming_response_create(serf_incoming_request_t *request)
+{
+    apr_status_t status;
+    serf_bucket_alloc_t *alloc;
+    serf_bucket_t *bucket;
+
+    if (request->response_written)
+        return APR_SUCCESS;
+
+    alloc = request->incoming->allocator;
+
+    status = request->response_setup(&bucket, request,
+                                     request->response_setup_baton,
+                                     alloc, request->pool);
+
+    if (status)
+        return status;
+
+    request->response_written = true;
+
+    /* ### Needs work for other protocols */
+    serf_bucket_aggregate_append(request->incoming->ostream_tail,
+                                 serf__bucket_event_create(bucket,
+                                                           request,
+                                                           NULL,
+                                                           NULL,
+                                                           response_finished,
+                                                           alloc));
+
+    /* Want write event */
+    request->incoming->dirty_conn = true;
+    request->incoming->ctx->dirty_pollset = true;
+
+    return APR_SUCCESS;
+}
+
+apr_status_t perform_peek_protocol(serf_incoming_t *client)
+{
+    const char h2prefix[] = "PRI * HTTP/2.0\r\n";
+    const apr_size_t h2prefixlen = 16;
+    const char *data;
+    apr_size_t len;
+
+    struct peek_data_t
+    {
+        char buffer[sizeof(h2prefix)];
+        int read;
+    } *peek_data = client->protocol_baton;
+
+    apr_status_t status;
+
+    if (!peek_data) {
+
+        status = serf_bucket_peek(client->stream, &data, &len);
+
+        if (len > h2prefixlen)
+          len = h2prefixlen;
+
+        if (len && memcmp(data, h2prefix, len) != 0) {
+            /* This is not HTTP/2 */
+
+            /* Easy out */
+            serf_bucket_destroy(client->proto_peek_bkt);
+            client->proto_peek_bkt = NULL;
+
+            return APR_SUCCESS;
+        }
+        else if (len == h2prefixlen) {
+            /* We have HTTP/2 */
+            client->framing = SERF_CONNECTION_FRAMING_TYPE_HTTP2;
+
+            serf_bucket_destroy(client->proto_peek_bkt);
+            client->proto_peek_bkt = NULL;
+
+            return APR_SUCCESS;
+        }
+
+        peek_data = serf_bucket_mem_calloc(client->allocator,
+                                          sizeof(*peek_data));
+        client->protocol_baton = peek_data;
+    }
+
+    do {
+        status = serf_bucket_read(client->stream,
+                                  h2prefixlen - peek_data->read,
+                                  &data, &len);
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+
+        memcpy(peek_data->buffer + peek_data->read, data, len);
+        peek_data->read += len;
+
+        if (len && memcmp(data, h2prefix, len)) {
+            /* This is not HTTP/2 */
+
+            /* Put data ahead of other data and do the usual thing */
+            serf_bucket_aggregate_prepend(client->proto_peek_bkt,
+                                          serf_bucket_simple_own_create(
+                                                peek_data->buffer,
+                                                peek_data->read,
+                                                client->allocator));
+
+            return APR_SUCCESS;
+        }
+        else if (len == h2prefixlen) {
+            /* We have HTTP/2 */
+            client->framing = SERF_CONNECTION_FRAMING_TYPE_HTTP2;
+
+            /* Put data ahead of other data and do the usual thing */
+            serf_bucket_aggregate_prepend(client->proto_peek_bkt,
+                                          serf_bucket_simple_own_create(
+                                            peek_data->buffer,
+                                            peek_data->read,
+                                            client->allocator));
+
+            return APR_SUCCESS;
+        }
+    } while (status == APR_SUCCESS);
+
     return status;
 }
 
 static apr_status_t read_from_client(serf_incoming_t *client)
 {
-    return APR_ENOTIMPL;
+    apr_status_t status;
+    serf_incoming_request_t *rq;
+
+    if (client->proto_peek_bkt)
+    {
+        status = perform_peek_protocol(client);
+        if (status)
+            return status;
+    }
+
+    do {
+        rq = client->current_request;
+        if (!rq) {
+
+            serf_bucket_t *read_bkt;
+            rq = serf_bucket_mem_calloc(client->allocator, sizeof(*rq));
+
+            apr_pool_create(&rq->pool, client->pool);
+            rq->incoming = client;
+
+            if (client->proto_peek_bkt) {
+                read_bkt = client->proto_peek_bkt;
+                client->proto_peek_bkt = NULL;
+            }
+            else
+                read_bkt = serf_bucket_barrier_create(client->stream,
+                                                      client->allocator);
+
+            status = client->req_setup(&rq->req_bkt, read_bkt, rq,
+                                       client->req_setup_baton,
+                                       &rq->handler, &rq->handler_baton,
+                                       &rq->response_setup,
+                                       &rq->response_setup_baton,
+                                       rq->pool);
+
+            if (status) {
+                apr_pool_destroy(rq->pool);
+                serf_bucket_mem_free(client->allocator, rq);
+                return status;
+            }
+        }
+
+        /* Run handler once or multiple times until status? */
+        status = rq->handler(rq, rq->req_bkt, rq->handler_baton, rq->pool);
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+
+        if (APR_STATUS_IS_EOF(status)) {
+            /* Write response if this hasn't been done yet */
+            status = serf_incoming_response_create(rq);
+
+            if (SERF_BUCKET_READ_ERROR(status))
+                return status;
+
+            rq->request_read = true;
+            client->current_request = NULL;
+
+            if (rq->request_read && rq->response_finished) {
+                status = destroy_request(rq);
+            }
+
+            /* ### TODO: Check if connection is also at EOF? */
+            status = APR_SUCCESS;
+        }
+    }
+    while (status == APR_SUCCESS);
+
+    return status;
+}
+
+static apr_status_t socket_writev(serf_incoming_t *client)
+{
+    apr_size_t written;
+    apr_status_t status;
+
+    status = apr_socket_sendv(client->skt, client->vec,
+                              client->vec_len, &written);
+    if (status && !APR_STATUS_IS_EAGAIN(status))
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+                  "socket_sendv error %d\n", status);
+
+    /* did we write everything? */
+    if (written) {
+        apr_size_t len = 0;
+        int i;
+
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+                  "--- socket_sendv: %d bytes. --\n", written);
+
+        for (i = 0; i < client->vec_len; i++) {
+            len += client->vec[i].iov_len;
+            if (written < len) {
+                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+                                 "%.*s", client->vec[i].iov_len - (len - 
written),
+                                 client->vec[i].iov_base);
+                if (i) {
+                    memmove(client->vec, &client->vec[i],
+                            sizeof(struct iovec) * (client->vec_len - i));
+                    client->vec_len -= i;
+                }
+                client->vec[0].iov_base = (char *)client->vec[0].iov_base + 
(client->vec[0].iov_len - (len - written));
+                client->vec[0].iov_len = len - written;
+                break;
+            } else {
+                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+                                 "%.*s",
+                                 client->vec[i].iov_len, 
client->vec[i].iov_base);
+            }
+        }
+        if (len == written) {
+            client->vec_len = 0;
+        }
+        serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, "\n");
+
+        /* Log progress information */
+        serf__context_progress_delta(client->ctx, 0, written);
+    }
+
+    return status;
+}
+
+static apr_status_t no_more_writes(serf_incoming_t *client)
+{
+  /* Note that we should hold new requests until we open our new socket. */
+  serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+            "stop writing on client 0x%x\n", client);
+
+  /* Clear our iovec. */
+  client->vec_len = 0;
+
+  /* Update the pollset to know we don't want to write on this socket any
+  * more.
+  */
+  client->dirty_conn = true;
+  client->ctx->dirty_pollset = true;
+  return APR_SUCCESS;
+}
+
+static apr_status_t serf__client_flush(serf_incoming_t *client,
+                                       bool pump)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_status_t read_status = APR_SUCCESS;
+    serf_bucket_t *ostreamh = client->ostream_head;
+
+    client->hit_eof = FALSE;
+
+    while (status == APR_SUCCESS) {
+
+        /* First try to write out what is already stored in the
+           connection vecs. */
+        while (client->vec_len && !status) {
+            status = socket_writev(client);
+
+            /* If the write would have blocked, then we're done.
+             * Don't try to write anything else to the socket.
+             */
+            if (APR_STATUS_IS_EPIPE(status)
+                || APR_STATUS_IS_ECONNRESET(status)
+                || APR_STATUS_IS_ECONNABORTED(status))
+              return no_more_writes(client);
+        }
+
+        if (status || !pump)
+            return status;
+        else if (read_status || client->vec_len || client->hit_eof)
+            return read_status;
+
+        /* ### optimize at some point by using read_for_sendfile */
+        /* TODO: now that read_iovec will effectively try to return as much
+           data as available, we probably don't want to read ALL_AVAIL, but
+           a lower number, like the size of one or a few TCP packets, the
+           available TCP buffer size ... */
+        client->hit_eof = 0;
+        read_status = serf_bucket_read_iovec(ostreamh,
+                                             SERF_READ_ALL_AVAIL,
+                                             IOV_MAX,
+                                             client->vec,
+                                             &client->vec_len);
+
+        if (read_status == SERF_ERROR_WAIT_CONN) {
+            /* The bucket told us that it can't provide more data until
+            more data is read from the socket. This normally happens
+            during a SSL handshake.
+
+            We should avoid looking for writability for a while so
+            that (hopefully) something will appear in the bucket so
+            we can actually write something. otherwise, we could
+            end up in a CPU spin: socket wants something, but we
+            don't have anything (and keep returning EAGAIN) */
+            client->stop_writing = true;
+            client->dirty_conn = true;
+            client->ctx->dirty_pollset = true;
+
+            read_status = APR_EAGAIN;
+        }
+        else if (APR_STATUS_IS_EAGAIN(read_status)) {
+
+            /* We read some stuff, but did we read everything ? */
+            if (client->hit_eof)
+                read_status = APR_SUCCESS;
+        }
+        else if (SERF_BUCKET_READ_ERROR(read_status)) {
+
+            /* Something bad happened. Propagate any errors. */
+            return read_status;
+        }
+    }
+
+    return status;
 }
 
 static apr_status_t write_to_client(serf_incoming_t *client)
 {
-    return APR_ENOTIMPL;
+    apr_status_t status;
+
+    status = serf__client_flush(client, true);
+
+    if (APR_STATUS_IS_EAGAIN(status))
+        return APR_SUCCESS;
+    else if (status)
+        return status;
+
+    /* Probably nothing to write. Connection will check new requests */
+    client->dirty_conn = 1;
+    client->ctx->dirty_pollset = 1;
+
+    return APR_SUCCESS;
 }
 
 apr_status_t serf__process_client(serf_incoming_t *client, apr_int16_t events)
@@ -153,6 +532,15 @@ apr_status_t serf__process_listener(serf
         return status;
     }
 
+    /* Set the socket to be non-blocking */
+    if ((status = apr_socket_timeout_set(client->skt, 0)) != APR_SUCCESS)
+        return status;
+
+    /* Disable Nagle's algorithm */
+    if ((status = apr_socket_opt_set(client->skt,
+                                     APR_TCP_NODELAY, 1)) != APR_SUCCESS)
+        return status;
+
     status = l->accept_func(l->ctx, l, l->accept_baton, in, p);
 
     if (status) {
@@ -180,8 +568,8 @@ apr_status_t serf_incoming_create2(
     void *setup_baton,
     serf_incoming_closed_t closed,
     void *closed_baton,
-    serf_incoming_request_cb_t request,
-    void *request_baton,
+    serf_incoming_request_setup_t req_setup,
+    void *req_setup_baton,
     apr_pool_t *pool)
 {
     apr_status_t rv;
@@ -197,12 +585,13 @@ apr_status_t serf_incoming_create2(
     ic->allocator = serf_bucket_allocator_create(ic_pool, NULL, NULL);
     ic->baton.type = SERF_IO_CLIENT;
     ic->baton.u.client = ic;
-    ic->request_baton =  request_baton;
-    ic->request = request;
+    ic->req_setup = req_setup;
+    ic->req_setup_baton = req_setup_baton;
     ic->skt = insock;
 
     ic->dirty_conn = false;
     ic->wait_for_connect = true;
+    ic->vec_len = 0;
 
     ic->setup = setup;
     ic->setup_baton = setup_baton;
@@ -215,6 +604,9 @@ apr_status_t serf_incoming_create2(
     ic->ostream_tail = NULL;
     ic->ssltunnel_ostream = NULL;
 
+    ic->protocol_baton = NULL;
+    ic->current_request = NULL;
+
     ic->desc.desc_type = APR_POLL_SOCKET;
     ic->desc.desc.s = ic->skt;
     ic->desc.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
@@ -242,59 +634,9 @@ apr_status_t serf_incoming_create2(
         /* Let caller handle the socket */
     }
 
-    return rv;
-}
-
-typedef struct ic_setup_baton_t
-{
-  serf_incoming_t *incoming;
-} ic_setup_baton_t;
-
-static apr_status_t dummy_setup(apr_socket_t *skt,
-                                serf_bucket_t **read_bkt,
-                                serf_bucket_t **write_bkt,
-                                void *setup_baton,
-                                apr_pool_t *pool)
-{
-  ic_setup_baton_t *isb = setup_baton;
-
-  *read_bkt = serf_bucket_socket_create(skt, isb->incoming->allocator);
+    *(serf_incoming_t **)apr_array_push(ctx->incomings) = *client;
 
-  return APR_SUCCESS;
-}
-
-static apr_status_t dummy_closed(serf_incoming_t *incoming,
-                                 void *closed_baton,
-                                 apr_status_t why,
-                                 apr_pool_t *pool)
-{
-  return APR_SUCCESS;
-}
-
-apr_status_t serf_incoming_create(
-    serf_incoming_t **client,
-    serf_context_t *ctx,
-    apr_socket_t *insock,
-    void *request_baton,
-    serf_incoming_request_cb_t request,
-    apr_pool_t *pool)
-{
-  ic_setup_baton_t *isb;
-  apr_status_t status;
-
-  /* Allocate baton to hand over created listener
-     (to get access to its allocator) */
-  isb = apr_pcalloc(pool, sizeof(*isb));
-
-  status = serf_incoming_create2(client, ctx, insock,
-                                 dummy_setup, isb,
-                                 dummy_closed, isb,
-                                 request, request_baton, pool);
-
-  if (!status)
-    isb->incoming = *client;
-
-  return status;
+    return rv;
 }
 
 apr_status_t serf_listener_create(
@@ -400,7 +742,7 @@ apr_status_t serf__incoming_update_polls
          */
         if (incoming->vec_len) {
             /* We still have vecs in the connection, which lifetime is
-               managed by buckets inside conn->ostream_head.
+               managed by buckets inside client->ostream_head.
 
                Don't touch ostream as that might destroy the vecs */
 

Modified: serf/trunk/serf.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1714546&r1=1714545&r2=1714546&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Mon Nov 16 10:47:39 2015
@@ -515,13 +515,39 @@ apr_status_t serf_listener_create(
     serf_accept_client_t accept_func,
     apr_pool_t *pool);
 
-typedef apr_status_t (*serf_incoming_request_cb_t)(
-    serf_context_t *ctx,
+typedef apr_status_t (*serf_incoming_request_handler_t)(
+    serf_incoming_request_t *req,
+    serf_bucket_t *request,
+    void *handler_baton,
+    apr_pool_t *pool);
+
+typedef apr_status_t (*serf_incoming_response_setup_t)(
+    serf_bucket_t **resp_bkt,
+    serf_incoming_request_t *req,
+    void *setup_baton,
+    serf_bucket_alloc_t *allocator,
+    apr_pool_t *pool);
+
+typedef apr_status_t (*serf_incoming_request_setup_t)(
+    serf_bucket_t **req_bkt,
+    serf_bucket_t *stream,
     serf_incoming_request_t *req,
     void *request_baton,
+    serf_incoming_request_handler_t *handler,
+    void **handler_baton,
+    serf_incoming_response_setup_t *response_setup,
+    void *response_setup_baton,
     apr_pool_t *pool);
 
-/* ### Arguments in bad order. Doesn't support SSL */
+/* ### Deprecated: can't do anything with request */
+typedef apr_status_t(*serf_incoming_request_cb_t)(
+  serf_context_t *ctx,
+  serf_incoming_request_t *req,
+  void *request_baton,
+  apr_pool_t *pool);
+
+/* ### Deprecated: Misses ssl support and actual
+       request handling. */
 apr_status_t serf_incoming_create(
     serf_incoming_t **client,
     serf_context_t *ctx,
@@ -538,11 +564,15 @@ apr_status_t serf_incoming_create2(
     void *setup_baton,
     serf_incoming_closed_t closed,
     void *closed_baton,
-    serf_incoming_request_cb_t request,
-    void *request_baton,
+    serf_incoming_request_setup_t req_setup,
+    void *req_setup_baton,
     apr_pool_t *pool);
 
-
+/* Allows creating a response before the request is completely
+   read. Will call the response create function if it hasn't
+   been called yet. */
+apr_status_t serf_incoming_response_create(
+    serf_incoming_request_t *request);
 
 /**
  * Reset the connection, but re-open the socket again.

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714546&r1=1714545&r2=1714546&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Mon Nov 16 10:47:39 2015
@@ -184,6 +184,25 @@ struct serf_request_t {
     struct serf_request_t *next;
 };
 
+struct serf_incoming_request_t
+{
+    serf_incoming_t *incoming;
+    apr_pool_t *pool;
+
+    serf_bucket_t *req_bkt;
+
+    serf_incoming_request_handler_t handler;
+    void *handler_baton;
+
+    serf_incoming_response_setup_t response_setup;
+    void *response_setup_baton;
+
+    bool request_read;
+    bool response_written;
+    bool response_finished;
+    serf_bucket_t *response_bkt;
+};
+
 typedef struct serf_pollset_t {
     /* the set of connections to poll */
     apr_pollset_t *pollset;
@@ -329,8 +348,8 @@ struct serf_listener_t {
 struct serf_incoming_t {
     serf_context_t *ctx;
     serf_io_baton_t baton;
-    void *request_baton;
-    serf_incoming_request_cb_t request;
+    serf_incoming_request_setup_t req_setup;
+    void *req_setup_baton;
 
     apr_socket_t *skt; /* Lives in parent of POOL */
     apr_pool_t *pool; 
@@ -349,9 +368,14 @@ struct serf_incoming_t {
     serf_incoming_closed_t closed;
     void *closed_baton;
 
+    serf_connection_framing_type_t framing;
+
     bool dirty_conn;
     bool wait_for_connect;
     bool hit_eof;
+    bool stop_writing;
+
+    void *protocol_baton;
 
     /* A bucket wrapped around our socket (for reading responses). */
     serf_bucket_t *stream;
@@ -365,6 +389,10 @@ struct serf_incoming_t {
     serf_bucket_t *ssltunnel_ostream;
 
     serf_config_t *config;
+
+    serf_bucket_t *proto_peek_bkt;
+
+    serf_incoming_request_t *current_request; /* For HTTP/1 */
 };
 
 /* States for the different stages in the lifecyle of a connection. */

Modified: serf/trunk/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714546&r1=1714545&r2=1714546&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 10:47:39 2015
@@ -49,12 +49,63 @@ static apr_status_t client_closed(serf_i
   return APR_ENOTIMPL;
 }
 
-static apr_status_t client_request_acceptor(serf_context_t *ctx,
+static apr_status_t client_request_handler(serf_incoming_request_t *req,
+                                           serf_bucket_t *request,
+                                           void *handler_baton,
+                                           apr_pool_t *pool)
+{
+    const char *data;
+    apr_size_t len;
+    apr_status_t status;
+
+    do
+    {
+        status = serf_bucket_read(request, SERF_READ_ALL_AVAIL, &data, &len);
+    }
+    while (status == APR_SUCCESS);
+
+    return status;
+}
+
+static apr_status_t client_generate_response(serf_bucket_t **resp_bkt,
+                                             serf_incoming_request_t *req,
+                                             void *setup_baton,
+                                             serf_bucket_alloc_t *allocator,
+                                             apr_pool_t *pool)
+{
+    serf_bucket_t *tmp;
+#define CRLF "\r\n"
+
+    tmp = SERF_BUCKET_SIMPLE_STRING("HTTP/1.1 200 OK" CRLF
+                                    "Content-Length: 4" CRLF
+                                    CRLF
+                                    "OK" CRLF,
+                                    allocator);
+
+    *resp_bkt = tmp;
+    return APR_SUCCESS;
+}
+
+static apr_status_t client_request_acceptor(serf_bucket_t **req_bkt,
+                                            serf_bucket_t *stream,
                                             serf_incoming_request_t *req,
                                             void *request_baton,
+                                            serf_incoming_request_handler_t 
*handler,
+                                            void **handler_baton,
+                                            serf_incoming_response_setup_t 
*response,
+                                            void **response_baton,
                                             apr_pool_t *pool)
 {
-  return APR_ENOTIMPL;
+    test_baton_t *tb = request_baton;
+    *req_bkt = serf_bucket_incoming_request_create(stream, stream->allocator);
+
+    *handler = client_request_handler;
+    *handler_baton = tb;
+
+    *response = client_generate_response;
+    *response_baton = tb;
+
+    return APR_SUCCESS;
 }
 
 static apr_status_t client_acceptor(serf_context_t *ctx,
@@ -114,11 +165,6 @@ run_client_server_loop(test_baton_t *tb,
   {
     apr_pool_clear(iter_pool);
 
-
-    /* Even if the mock server returned an error, it may have written
-    something to the client. So process that data first, handle the error
-    later. */
-
     /* run client event loop */
     status = serf_context_run(tb->context, 0, iter_pool);
     if (!APR_STATUS_IS_TIMEUP(status) &&
@@ -154,7 +200,7 @@ void test_listen_http(CuTest *tc)
 
   status = run_client_server_loop(tb, num_requests,
                                   handler_ctx, tb->pool);
-  CuAssertIntEquals(tc, APR_ENOTIMPL, status);
+  CuAssertIntEquals(tc, APR_SUCCESS, status);
 }
 
 


Reply via email to