Author: rhuijben
Date: Mon Nov 23 17:03:04 2015
New Revision: 1715887

URL: http://svn.apache.org/viewvc?rev=1715887&view=rev
Log:
* incoming.c
  On the investigate branch: switch back to the old writing code, from
  before introducing the pump.

Modified:
    serf/branches/pump-investigate/incoming.c

Modified: serf/branches/pump-investigate/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715887&r1=1715886&r2=1715887&view=diff
==============================================================================
--- serf/branches/pump-investigate/incoming.c (original)
+++ serf/branches/pump-investigate/incoming.c Mon Nov 23 17:03:04 2015
@@ -350,10 +350,144 @@ static apr_status_t read_from_client(ser
     return status;
 }
 
+static apr_status_t socket_writev(serf_incoming_t *client)
+{
+    apr_size_t written;
+    apr_status_t status;
+
+    status = apr_socket_sendv(client->skt, client->vec,
+                              client->vec_len, &written);
+    if (status && !APR_STATUS_IS_EAGAIN(status))
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+                  "socket_sendv error %d\n", status);
+
+    /* did we write everything? */
+    if (written) {
+        apr_size_t len = 0;
+        int i;
+
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+                  "--- socket_sendv: %d bytes. --\n", written);
+
+        for (i = 0; i < client->vec_len; i++) {
+            len += client->vec[i].iov_len;
+            if (written < len) {
+                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+                                 "%.*s", client->vec[i].iov_len - (len - 
written),
+                                 client->vec[i].iov_base);
+                if (i) {
+                    memmove(client->vec, &client->vec[i],
+                            sizeof(struct iovec) * (client->vec_len - i));
+                    client->vec_len -= i;
+                }
+                client->vec[0].iov_base = (char *)client->vec[0].iov_base + 
(client->vec[0].iov_len - (len - written));
+                client->vec[0].iov_len = len - written;
+                break;
+            } else {
+                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+                                 "%.*s",
+                                 client->vec[i].iov_len, 
client->vec[i].iov_base);
+            }
+        }
+        if (len == written) {
+            client->vec_len = 0;
+        }
+        serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, "\n");
+
+        /* Log progress information */
+        serf__context_progress_delta(client->ctx, 0, written);
+    }
+
+    return status;
+}
+
+static apr_status_t no_more_writes(serf_incoming_t *client)
+{
+  /* Note that we should hold new requests until we open our new socket. */
+  serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+            "stop writing on client 0x%x\n", client);
+
+  /* Clear our iovec. */
+  client->vec_len = 0;
+
+  /* Update the pollset to know we don't want to write on this socket any
+  * more.
+  */
+  serf_io__set_pollset_dirty(&client->io);
+  return APR_SUCCESS;
+}
+
 apr_status_t serf__incoming_client_flush(serf_incoming_t *client,
                                          bool pump)
 {
-    return serf_pump__write(&client->pump, pump);
+    apr_status_t status = APR_SUCCESS;
+    apr_status_t read_status = APR_SUCCESS;
+    serf_bucket_t *ostreamh = client->pump.ostream_head;
+
+    client->pump.hit_eof = FALSE;
+
+    while (status == APR_SUCCESS) {
+
+        /* First try to write out what is already stored in the
+           connection vecs. */
+        while (client->vec_len && !status) {
+            status = socket_writev(client);
+
+            /* If the write would have blocked, then we're done.
+             * Don't try to write anything else to the socket.
+             */
+            if (APR_STATUS_IS_EPIPE(status)
+                || APR_STATUS_IS_ECONNRESET(status)
+                || APR_STATUS_IS_ECONNABORTED(status))
+              return no_more_writes(client);
+        }
+
+        if (status || !pump)
+            return status;
+        else if (read_status || client->vec_len || client->pump.hit_eof)
+            return read_status;
+
+        /* ### optimize at some point by using read_for_sendfile */
+        /* TODO: now that read_iovec will effectively try to return as much
+           data as available, we probably don't want to read ALL_AVAIL, but
+           a lower number, like the size of one or a few TCP packets, the
+           available TCP buffer size ... */
+        client->pump.hit_eof = 0;
+        read_status = serf_bucket_read_iovec(ostreamh,
+                                             SERF_READ_ALL_AVAIL,
+                                             IOV_MAX,
+                                             client->vec,
+                                             &client->vec_len);
+
+        if (read_status == SERF_ERROR_WAIT_CONN) {
+            /* The bucket told us that it can't provide more data until
+            more data is read from the socket. This normally happens
+            during a SSL handshake.
+
+            We should avoid looking for writability for a while so
+            that (hopefully) something will appear in the bucket so
+            we can actually write something. otherwise, we could
+            end up in a CPU spin: socket wants something, but we
+            don't have anything (and keep returning EAGAIN) */
+            client->pump.stop_writing = true;
+            serf_io__set_pollset_dirty(&client->io);
+
+            read_status = APR_EAGAIN;
+        }
+        else if (APR_STATUS_IS_EAGAIN(read_status)) {
+
+            /* We read some stuff, but did we read everything ? */
+            if (client->pump.hit_eof)
+                read_status = APR_SUCCESS;
+        }
+        else if (SERF_BUCKET_READ_ERROR(read_status)) {
+
+            /* Something bad happened. Propagate any errors. */
+            return read_status;
+        }
+    }
+
+    return status;
 }
 
 static apr_status_t write_to_client(serf_incoming_t *client)
@@ -387,7 +521,7 @@ void serf_incoming_set_framing_type(
 
     if (client->skt) {
         serf_io__set_pollset_dirty(&client->io);
-        client->pump.stop_writing = false;
+        client->pump.stop_writing = 0;
 
         /* Close down existing protocol */
         if (client->protocol_baton && client->perform_teardown) {
@@ -714,7 +848,6 @@ apr_status_t serf__incoming_update_polls
         }
         client->ctx->incomings->nelts--;
         apr_pool_destroy(client->pool);
-#endif
 
         if (cid >= ctx->incomings->nelts) {
             /* We skipped updating the pollset on this item as we moved it.
@@ -722,6 +855,7 @@ apr_status_t serf__incoming_update_polls
 
             return serf__incoming_update_pollset(GET_INCOMING(ctx, cid));
         }
+#endif
 
         return APR_SUCCESS;
     }
@@ -752,8 +886,38 @@ apr_status_t serf__incoming_update_polls
            But it also has the nice side effect of removing references
            from the aggregate to requests that are done.
          */
+        if (client->vec_len) {
+            /* We still have vecs in the connection, which lifetime is
+               managed by buckets inside client->ostream_head.
 
-        data_waiting = serf_pump__data_pending(&client->pump);
+               Don't touch ostream as that might destroy the vecs */
+
+            data_waiting = true;
+        }
+        else {
+            serf_bucket_t *ostream;
+
+            ostream = client->pump.ostream_head;
+
+            if (ostream) {
+                const char *dummy_data;
+                apr_size_t len;
+
+                status = serf_bucket_peek(ostream, &dummy_data, &len);
+
+                if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
+                    /* DATA or error waiting */
+                    data_waiting = TRUE; /* Error waiting */
+                }
+                else if (! status || APR_STATUS_IS_EOF(status)) {
+                    data_waiting = FALSE;
+                }
+                else
+                    data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
+            }
+            else
+                data_waiting = FALSE;
+        }
 
         if (data_waiting) {
             desc.reqevents |= APR_POLLOUT;


Reply via email to