Author: rhuijben
Date: Wed Nov 18 17:43:23 2015
New Revision: 1715026

URL: http://svn.apache.org/viewvc?rev=1715026&view=rev
Log:
Use the pump to write data for outgoing connections, by wrapping the pump
writer with a helper that switches streams in a few nasty places.

* outgoing.c
  (no_more_writes): Remove function.
  (serf__connection_flush) Reimplement as wrapper around
     serf_pump__write.

* pump.c
  (serf_pump__write): Complete variable rename and fix using the wrong 'pump'
    variable (line 296).

* serf_private.h
  (serf__connection_flush): Use bool argument.

Modified:
    serf/trunk/outgoing.c
    serf/trunk/pump.c
    serf/trunk/serf_private.h

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 17:43:23 2015
@@ -485,23 +485,6 @@ apr_status_t serf__open_connections(serf
     return APR_SUCCESS;
 }
 
-static apr_status_t no_more_writes(serf_connection_t *conn)
-{
-    /* Note that we should hold new requests until we open our new socket. */
-    conn->state = SERF_CONN_CLOSING;
-    serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
-              "stop writing on conn 0x%x\n", conn);
-
-    /* Clear our iovec. */
-    conn->pump.vec_len = 0;
-
-    /* Update the pollset to know we don't want to write on this socket any
-     * more.
-     */
-    serf_io__set_pollset_dirty(&conn->io);
-    return APR_SUCCESS;
-}
-
 /* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
  * the header contains value 'close' indicating the server is closing the
  * connection right after this response.
@@ -650,137 +633,33 @@ static apr_status_t reset_connection(ser
     return APR_SUCCESS;
 }
 
-static apr_status_t socket_writev(serf_pump_t *conn)
-{
-    apr_size_t written;
-    apr_status_t status;
-
-    status = apr_socket_sendv(conn->skt, conn->vec,
-                              conn->vec_len, &written);
-    if (status && !APR_STATUS_IS_EAGAIN(status))
-        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
-                  "socket_sendv error %d\n", status);
-
-    /* did we write everything? */
-    if (written) {
-        apr_size_t len = 0;
-        int i;
-
-        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
-                  "--- socket_sendv: %d bytes. --\n", written);
-
-        for (i = 0; i < conn->vec_len; i++) {
-            len += conn->vec[i].iov_len;
-            if (written < len) {
-                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
-                                 "%.*s", conn->vec[i].iov_len - (len - 
written),
-                                 conn->vec[i].iov_base);
-                if (i) {
-                    memmove(conn->vec, &conn->vec[i],
-                            sizeof(struct iovec) * (conn->vec_len - i));
-                    conn->vec_len -= i;
-                }
-                conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + 
(conn->vec[0].iov_len - (len - written));
-                conn->vec[0].iov_len = len - written;
-                break;
-            } else {
-                serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
-                                 "%.*s",
-                                 conn->vec[i].iov_len, conn->vec[i].iov_base);
-            }
-        }
-        if (len == written) {
-            conn->vec_len = 0;
-        }
-        serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n");
-
-        /* Log progress information */
-        serf__context_progress_delta(conn->io->ctx, 0, written);
-    }
-
-    return status;
-}
 
 apr_status_t serf__connection_flush(serf_connection_t *conn,
-                                    int pump)
+                                    bool fetch_new)
 {
-    apr_status_t status = APR_SUCCESS;
-    apr_status_t read_status = APR_SUCCESS;
-    serf_bucket_t *ostreamh = NULL;
-
-    conn->pump.hit_eof = FALSE;
-
-    while (status == APR_SUCCESS) {
-
-        /* First try to write out what is already stored in the
-           connection vecs. */
-        while (conn->pump.vec_len && !status) {
-            status = socket_writev(&conn->pump);
-
-            /* If the write would have blocked, then we're done.
-             * Don't try to write anything else to the socket.
-             */
-            if (APR_STATUS_IS_EPIPE(status)
-                || APR_STATUS_IS_ECONNRESET(status)
-                || APR_STATUS_IS_ECONNABORTED(status))
-              return no_more_writes(conn);
-        }
+    apr_status_t status;
+    serf_bucket_t *tmp_bkt = NULL;
 
-        if (status || !pump)
-            return status;
-        else if (read_status || conn->pump.vec_len || conn->pump.hit_eof)
-            return read_status;
+    if (fetch_new) {
+        serf_bucket_t *ostreamh, *ostreamt;
 
-        /* Ok, with the vecs written, we can now refill the per connection
-           output vecs */
-        if (!ostreamh) {
-            serf_bucket_t *ostreamt;
-
-            status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
-            if (status)
-                return status;
-        }
+        status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
+        if (status)
+            return status;
 
-        /* ### optimize at some point by using read_for_sendfile */
-        /* TODO: now that read_iovec will effectively try to return as much
-           data as available, we probably don't want to read ALL_AVAIL, but
-           a lower number, like the size of one or a few TCP packets, the
-           available TCP buffer size ... */
-        conn->pump.hit_eof = 0;
-        read_status = serf_bucket_read_iovec(ostreamh,
-                                             SERF_READ_ALL_AVAIL,
-                                             IOV_MAX,
-                                             conn->pump.vec,
-                                             &conn->pump.vec_len);
-
-        if (read_status == SERF_ERROR_WAIT_CONN) {
-            /* The bucket told us that it can't provide more data until
-            more data is read from the socket. This normally happens
-            during a SSL handshake.
-
-            We should avoid looking for writability for a while so
-            that (hopefully) something will appear in the bucket so
-            we can actually write something. otherwise, we could
-            end up in a CPU spin: socket wants something, but we
-            don't have anything (and keep returning EAGAIN) */
-            conn->pump.stop_writing = 1;
-            serf_io__set_pollset_dirty(&conn->io);
+        tmp_bkt = conn->pump.ostream_head;
+        conn->pump.ostream_head = ostreamh;
+    }
 
-            read_status = APR_EAGAIN;
-        }
-        else if (APR_STATUS_IS_EAGAIN(read_status)) {
+    status = serf_pump__write(&conn->pump, fetch_new);
 
-            /* We read some stuff, but did we read everything ? */
-            if (conn->pump.hit_eof)
-                read_status = APR_SUCCESS;
-        }
-        else if (SERF_BUCKET_READ_ERROR(read_status)) {
-
-            /* Something bad happened. Propagate any errors. */
-            return read_status;
-        }
+    if (fetch_new) {
+        conn->pump.ostream_head = tmp_bkt;
     }
 
+    if (conn->pump.done_writing)
+        conn->state = SERF_CONN_CLOSING;
+
     return status;
 }
 

Modified: serf/trunk/pump.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 17:43:23 2015
@@ -274,16 +274,15 @@ apr_status_t serf_pump__write(serf_pump_
 {
     apr_status_t status = APR_SUCCESS;
     apr_status_t read_status = APR_SUCCESS;
-    serf_pump_t *const conn = pump;
 
-    conn->hit_eof = FALSE;
+    pump->hit_eof = FALSE;
 
     while (status == APR_SUCCESS) {
 
         /* First try to write out what is already stored in the
            connection vecs. */
-        while (conn->vec_len && !status) {
-            status = socket_writev(conn);
+        while (pump->vec_len && !status) {
+            status = socket_writev(pump);
 
             /* If the write would have blocked, then we're done.
              * Don't try to write anything else to the socket.
@@ -291,12 +290,12 @@ apr_status_t serf_pump__write(serf_pump_
             if (APR_STATUS_IS_EPIPE(status)
                 || APR_STATUS_IS_ECONNRESET(status)
                 || APR_STATUS_IS_ECONNABORTED(status))
-              return no_more_writes(conn);
+              return no_more_writes(pump);
         }
 
-        if (status || !pump)
+        if (status || !fetch_new)
             return status;
-        else if (read_status || conn->vec_len || conn->hit_eof)
+        else if (read_status || pump->vec_len || pump->hit_eof)
             return read_status;
 
         /* ### optimize at some point by using read_for_sendfile */
@@ -304,12 +303,12 @@ apr_status_t serf_pump__write(serf_pump_
            data as available, we probably don't want to read ALL_AVAIL, but
            a lower number, like the size of one or a few TCP packets, the
            available TCP buffer size ... */
-        conn->hit_eof = 0;
+        pump->hit_eof = false;
         read_status = serf_bucket_read_iovec(pump->ostream_head,
                                              SERF_READ_ALL_AVAIL,
                                              IOV_MAX,
-                                             conn->vec,
-                                             &conn->vec_len);
+                                             pump->vec,
+                                             &pump->vec_len);
 
         if (read_status == SERF_ERROR_WAIT_CONN) {
             /* The bucket told us that it can't provide more data until
@@ -321,15 +320,15 @@ apr_status_t serf_pump__write(serf_pump_
             we can actually write something. otherwise, we could
             end up in a CPU spin: socket wants something, but we
             don't have anything (and keep returning EAGAIN) */
-            conn->stop_writing = true;
-            serf_io__set_pollset_dirty(conn->io);
+            pump->stop_writing = true;
+            serf_io__set_pollset_dirty(pump->io);
 
             read_status = APR_EAGAIN;
         }
         else if (APR_STATUS_IS_EAGAIN(read_status)) {
 
             /* We read some stuff, but did we read everything ? */
-            if (conn->hit_eof)
+            if (pump->hit_eof)
                 read_status = APR_SUCCESS;
         }
         else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -341,4 +340,3 @@ apr_status_t serf_pump__write(serf_pump_
 
     return status;
 }
-

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 17:43:23 2015
@@ -682,7 +682,7 @@ serf_request_t *serf__ssltunnel_request_
                                                void *setup_baton);
 void serf__connection_set_pipelining(serf_connection_t *conn, int enabled);
 apr_status_t serf__connection_flush(serf_connection_t *conn,
-                                    int pump);
+                                    bool fetch_new);
 
 apr_status_t serf__provide_credentials(serf_context_t *ctx,
                                        char **username,


Reply via email to