Author: rhuijben
Date: Mon Nov 23 16:41:12 2015
New Revision: 1715871

URL: http://svn.apache.org/viewvc?rev=1715871&view=rev
Log:
Backport recent pump fixes to the pump-investigate branch to allow testing
them on OS/X.

* buckets/ssl_buckets.c
* incoming.c
* pump.c
* serf_private.h
* test/serf_httpd.c
* test/test_server.c
  Merge some recent work.

Modified:
    serf/branches/pump-investigate/buckets/ssl_buckets.c
    serf/branches/pump-investigate/incoming.c
    serf/branches/pump-investigate/pump.c
    serf/branches/pump-investigate/serf_private.h
    serf/branches/pump-investigate/test/test_server.c

Modified: serf/branches/pump-investigate/buckets/ssl_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/buckets/ssl_buckets.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/buckets/ssl_buckets.c (original)
+++ serf/branches/pump-investigate/buckets/ssl_buckets.c Mon Nov 23 16:41:12 
2015
@@ -177,7 +177,7 @@ struct serf_ssl_context_t {
 
     const char *selected_protocol; /* Cached protocol value once available */
     /* Protocol callback */
-    serf_ssl_protocol_result_cb_t protocol_callback; 
+    serf_ssl_protocol_result_cb_t protocol_callback;
     void *protocol_userdata;
 
     serf_config_t *config;
@@ -255,7 +255,7 @@ apps_ssl_info_callback(const SSL *s, int
         if (ret > 0) {
             /* ret > 0: Just a state change; not an error */
             serf__log(level, LOGCOMP_SSL, __FILE__, ctx->config,
-                      "%s: %s\n",
+                      "%s: %s (%d)\n",
                       str, SSL_state_string_long(s),
                       ctx->crypt_status);
         }
@@ -282,7 +282,7 @@ apps_ssl_info_callback(const SSL *s, int
 #endif
 
 
-/* Listens for the SSL renegotiate ciphers alert and report it back to the 
+/* Listens for the SSL renegotiate ciphers alert and report it back to the
    serf context. */
 static void
 detect_renegotiate(const SSL *s, int where, int ret)
@@ -647,7 +647,7 @@ get_subject_alt_names(apr_array_header_t
         }
         sk_GENERAL_NAME_pop_free(names, GENERAL_NAME_free);
     }
-    
+
     return APR_SUCCESS;
 }
 
@@ -698,7 +698,7 @@ validate_server_certificate(int cert_val
         int err = X509_STORE_CTX_get_error(store_ctx);
 
         switch(err) {
-            case X509_V_ERR_CERT_NOT_YET_VALID: 
+            case X509_V_ERR_CERT_NOT_YET_VALID:
                     failures |= SERF_SSL_CERT_NOTYETVALID;
                     break;
             case X509_V_ERR_CERT_HAS_EXPIRED:
@@ -840,7 +840,7 @@ validate_server_certificate(int cert_val
     {
         ctx->pending_err = SERF_ERROR_SSL_CERT_FAILED;
     }
-        
+
     return cert_valid;
 }
 
@@ -1005,7 +1005,7 @@ static apr_status_t ssl_decrypt(void *ba
         serf__log(LOGLVL_DEBUG, LOGCOMP_SSLMSG, __FILE__, ctx->config,
                     "---\n%.*s\n-(%d)-\n", *len, buf, *len);
     }
- 
+
 
     if (!ctx->handshake_finished
         && !SERF_BUCKET_READ_ERROR(status)) {
@@ -1166,7 +1166,7 @@ static apr_status_t ssl_encrypt(void *ba
                     serf__log(LOGLVL_DEBUG, LOGCOMP_SSL, __FILE__, ctx->config,
                               "---\n%.*s\n-(%d)-\n",
                               interim_len, vecs_data, interim_len);
-                    
+
                 }
             }
         }
@@ -1352,10 +1352,10 @@ static void init_ssl_libraries(void)
            thread has completed */
         while (val != INIT_DONE) {
             apr_sleep(APR_USEC_PER_SEC / 1000);
-      
+
             val = apr_atomic_cas32(&have_init_ssl,
                                    INIT_UNINITIALIZED,
-                                   INIT_UNINITIALIZED);            
+                                   INIT_UNINITIALIZED);
         }
     }
 }
@@ -2196,7 +2196,7 @@ const char *serf_ssl_cert_export(
 
     encoded_cert = apr_palloc(pool, apr_base64_encode_len(len));
     apr_base64_encode(encoded_cert, binary_cert, len);
-    
+
     return encoded_cert;
 }
 

Modified: serf/branches/pump-investigate/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/incoming.c (original)
+++ serf/branches/pump-investigate/incoming.c Mon Nov 23 16:41:12 2015
@@ -32,12 +32,13 @@ static apr_status_t client_connected(ser
 {
     /* serf_context_t *ctx = client->ctx; */
     apr_status_t status;
+    serf_bucket_t *stream;
     serf_bucket_t *ostream;
 
     serf_pump__store_ipaddresses_in_config(&client->pump);
 
     serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
-              "socket for client 0x%x connected\n", client);
+              "socket for client 0x%p connected\n", client);
 
     /* ### Connection does auth setup here */
 
@@ -46,17 +47,17 @@ static apr_status_t client_connected(ser
     ostream = client->pump.ostream_tail;
 
     status = client->setup(client->skt,
-                           &client->pump.stream,
+                           &stream,
                            &ostream,
                            client->setup_baton, client->pool);
 
     if (status) {
-        serf_pump__complete_setup(&client->pump, NULL);
+        serf_pump__complete_setup(&client->pump, NULL, NULL);
         /* ### Cleanup! (serf__connection_pre_cleanup) */
         return status;
     }
 
-    serf_pump__complete_setup(&client->pump, ostream);
+    serf_pump__complete_setup(&client->pump, stream, ostream);
 
     if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
         client->proto_peek_bkt = serf_bucket_aggregate_create(

Modified: serf/branches/pump-investigate/pump.c
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/pump.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/pump.c (original)
+++ serf/branches/pump-investigate/pump.c Mon Nov 23 16:41:12 2015
@@ -29,7 +29,7 @@
 
 #include "serf_private.h"
 
-apr_status_t pump_cleanup(void *baton)
+static apr_status_t pump_cleanup(void *baton)
 {
     serf_pump_t *pump = baton;
 
@@ -42,6 +42,11 @@ apr_status_t pump_cleanup(void *baton)
         pump->ostream_tail = NULL;
     }
 
+    pump->pool = NULL; /* Don't run again */
+    pump->allocator = NULL;
+    pump->skt = NULL;
+    pump->vec_len = 0;
+
     return APR_SUCCESS;
 }
 
@@ -58,11 +63,34 @@ void serf_pump__init(serf_pump_t *pump,
     pump->allocator = allocator;
     pump->config = config;
     pump->skt = skt;
+    pump->pool = pool;
 
     apr_pool_cleanup_register(pool, pump, pump_cleanup,
                               apr_pool_cleanup_null);
 }
 
+void serf_pump__done(serf_pump_t *pump)
+{
+    if (pump->pool) {
+        apr_pool_cleanup_run(pump->pool, pump, pump_cleanup);
+    }
+
+    pump->io = NULL;
+    pump->allocator = NULL;
+    pump->config = NULL;
+
+    /* pump->stream is managed by the current reader! */
+
+    pump->ostream_head = NULL;
+    pump->ostream_tail = NULL;
+
+    pump->done_writing = false;
+    pump->stop_writing = false;
+    pump->hit_eof = false;
+
+    pump->pool = NULL;
+}
+
 /* Safely check if there is still data pending on the connection, carefull
    to not accidentally make it invalid. */
 bool serf_pump__data_pending(serf_pump_t *pump)
@@ -78,9 +106,6 @@ bool serf_pump__data_pending(serf_pump_t
         status = serf_bucket_peek(pump->ostream_head, &data, &len);
         if (!SERF_BUCKET_READ_ERROR(status)) {
             if (len > 0) {
-                serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
-                          "Extra data to be written after sending complete "
-                          "requests.\n");
                 return true;
             }
         }
@@ -118,8 +143,10 @@ void serf_pump__prepare_setup(serf_pump_
 }
 
 void serf_pump__complete_setup(serf_pump_t *pump,
+                               serf_bucket_t *stream,
                                serf_bucket_t *ostream)
 {
+    pump->stream = stream;
     if (ostream)
         serf_bucket_aggregate_append(pump->ostream_head, ostream);
     else
@@ -133,7 +160,10 @@ void serf_pump__complete_setup(serf_pump
     /* Share the configuration with the ssl_decrypt and socket buckets. The
      response buckets wrapping the ssl_decrypt/socket buckets won't get the
      config automatically because they are upstream. */
-    serf_bucket_set_config(pump->stream, pump->config);
+    if (stream != NULL) {
+        pump->stream = stream;
+        serf_bucket_set_config(pump->stream, pump->config);
+    }
 
     /* We typically have one of two scenarios, based on whether the
        application decided to encrypt this connection:
@@ -162,13 +192,13 @@ void serf_pump__store_ipaddresses_in_con
         char buf[48];
         if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
             serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_LOCALIP,
-                                    "%s:%d", buf, sa->port);
+                                    "%s:%d", buf, (int)sa->port);
     }
     if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == APR_SUCCESS) {
         char buf[48];
         if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
             serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_REMOTEIP,
-                                    "%s:%d", buf, sa->port);
+                                    "%s:%d", buf, (int)sa->port);
     }
 }
 
@@ -177,7 +207,7 @@ static apr_status_t no_more_writes(serf_
     /* Note that we should hold new requests until we open our new socket. */
     pump->done_writing = true;
     serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
-              "stop writing on 0x%x\n", pump->io->u.conn);
+              "stop writing on 0x%p\n", pump->io->u.v);
 
     /* Clear our iovec. */
     pump->vec_len = 0;
@@ -199,7 +229,7 @@ static apr_status_t socket_writev(serf_p
                               pump->vec_len, &written);
     if (status && !APR_STATUS_IS_EAGAIN(status))
         serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
-                  "socket_sendv error %d\n", status);
+                  "socket_sendv error %d on 0x%p\n", status, pump->io->u.v);
 
     /* did we write everything? */
     if (written) {
@@ -207,26 +237,31 @@ static apr_status_t socket_writev(serf_p
         int i;
 
         serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
-                  "--- socket_sendv: %d bytes. --\n", written);
+                  "--- socket_sendv: %d bytes on 0x%p. --\n",
+                  (int)written, pump->io->u.v);
 
         for (i = 0; i < conn->vec_len; i++) {
             len += conn->vec[i].iov_len;
             if (written < len) {
                 serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
-                                 "%.*s", conn->vec[i].iov_len - (len - 
written),
-                                 conn->vec[i].iov_base);
+                                 "%.*s",
+                                 (int)(conn->vec[i].iov_len - (len - written)),
+                                 (const char *)conn->vec[i].iov_base);
                 if (i) {
                     memmove(conn->vec, &conn->vec[i],
                             sizeof(struct iovec) * (conn->vec_len - i));
                     conn->vec_len -= i;
                 }
-                conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + 
(conn->vec[0].iov_len - (len - written));
+                conn->vec[0].iov_base = (char *)conn->vec[0].iov_base
+                                        + conn->vec[0].iov_len
+                                        - (len - written);
                 conn->vec[0].iov_len = len - written;
                 break;
             } else {
                 serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
                                  "%.*s",
-                                 conn->vec[i].iov_len, conn->vec[i].iov_base);
+                                 (int)conn->vec[i].iov_len,
+                                 (const char*)conn->vec[i].iov_base);
             }
         }
         if (len == written) {
@@ -246,16 +281,15 @@ apr_status_t serf_pump__write(serf_pump_
 {
     apr_status_t status = APR_SUCCESS;
     apr_status_t read_status = APR_SUCCESS;
-    serf_pump_t *const conn = pump;
 
-    conn->hit_eof = FALSE;
+    pump->hit_eof = FALSE;
 
     while (status == APR_SUCCESS) {
 
         /* First try to write out what is already stored in the
            connection vecs. */
-        while (conn->vec_len && !status) {
-            status = socket_writev(conn);
+        while (pump->vec_len && !status) {
+            status = socket_writev(pump);
 
             /* If the write would have blocked, then we're done.
              * Don't try to write anything else to the socket.
@@ -263,12 +297,22 @@ apr_status_t serf_pump__write(serf_pump_
             if (APR_STATUS_IS_EPIPE(status)
                 || APR_STATUS_IS_ECONNRESET(status)
                 || APR_STATUS_IS_ECONNABORTED(status))
-              return no_more_writes(conn);
+              return no_more_writes(pump);
         }
 
-        if (status || !pump)
+        if (status || !fetch_new) {
+
+            /* If we couldn't write everything that we tried,
+               make sure that we will receive a write event next time */
+            if (APR_STATUS_IS_EAGAIN(status)
+                && !pump->io->dirty_conn
+                && !(pump->io->reqevents & APR_POLLOUT))
+            {
+                serf_io__set_pollset_dirty(pump->io);
+            }
             return status;
-        else if (read_status || conn->vec_len || conn->hit_eof)
+        }
+        else if (read_status || pump->vec_len || pump->hit_eof)
             return read_status;
 
         /* ### optimize at some point by using read_for_sendfile */
@@ -276,12 +320,12 @@ apr_status_t serf_pump__write(serf_pump_
            data as available, we probably don't want to read ALL_AVAIL, but
            a lower number, like the size of one or a few TCP packets, the
            available TCP buffer size ... */
-        conn->hit_eof = 0;
+        pump->hit_eof = false;
         read_status = serf_bucket_read_iovec(pump->ostream_head,
                                              SERF_READ_ALL_AVAIL,
                                              IOV_MAX,
-                                             conn->vec,
-                                             &conn->vec_len);
+                                             pump->vec,
+                                             &pump->vec_len);
 
         if (read_status == SERF_ERROR_WAIT_CONN) {
             /* The bucket told us that it can't provide more data until
@@ -293,15 +337,20 @@ apr_status_t serf_pump__write(serf_pump_
             we can actually write something. otherwise, we could
             end up in a CPU spin: socket wants something, but we
             don't have anything (and keep returning EAGAIN) */
-            conn->stop_writing = true;
-            serf_io__set_pollset_dirty(conn->io);
+
+            serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, pump->config,
+                      "Output stream requested temporary write delay "
+                      "on 0x%p\n", pump->io->u.v);
+
+            pump->stop_writing = true;
+            serf_io__set_pollset_dirty(pump->io);
 
             read_status = APR_EAGAIN;
         }
         else if (APR_STATUS_IS_EAGAIN(read_status)) {
 
             /* We read some stuff, but did we read everything ? */
-            if (conn->hit_eof)
+            if (pump->hit_eof)
                 read_status = APR_SUCCESS;
         }
         else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -314,3 +363,37 @@ apr_status_t serf_pump__write(serf_pump_
     return status;
 }
 
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+                                   serf_bucket_t *bucket,
+                                   bool flush)
+{
+    apr_status_t status;
+
+    if (!flush
+        && !pump->io->dirty_conn
+        && !pump->stop_writing
+        && !(pump->io->reqevents & APR_POLLOUT)
+        && !serf_pump__data_pending(pump))
+    {
+        /* If not writing now,
+           * and not already dirty
+           * and nothing pending yet
+           Then mark the pollset dirty to trigger a write */
+
+        serf_io__set_pollset_dirty(pump->io);
+    }
+
+    serf_bucket_aggregate_append(pump->ostream_tail, bucket);
+
+    if (!flush)
+        return APR_SUCCESS;
+
+    /* Flush final output buffer (after ssl, etc.) */
+    status = serf_pump__write(pump, TRUE);
+
+    if (SERF_BUCKET_READ_ERROR(status))
+        return status;
+    else
+        return APR_SUCCESS;
+}
+

Modified: serf/branches/pump-investigate/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/serf_private.h?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/serf_private.h (original)
+++ serf/branches/pump-investigate/serf_private.h Mon Nov 23 16:41:12 2015
@@ -136,6 +136,7 @@ typedef struct serf_io_baton_t {
         serf_incoming_t *client;
         serf_connection_t *conn;
         serf_listener_t *listener;
+        const void *const v;
     } u;
 
     /* are we a dirty connection that needs its poll status updated? */
@@ -147,14 +148,18 @@ typedef struct serf_io_baton_t {
 
 } serf_io_baton_t;
 
-typedef struct serf_pump_io_t
+typedef struct serf_pump_t
 {
     serf_io_baton_t *io;
 
     serf_bucket_alloc_t *allocator;
     serf_config_t *config;
 
+    /* The incoming stream. Stored here for easy access by users,
+       but not managed as part of the pump */
     serf_bucket_t *stream;
+
+    /* The outgoing stream */
     serf_bucket_t *ostream_head;
     serf_bucket_t *ostream_tail;
 
@@ -171,6 +176,8 @@ typedef struct serf_pump_io_t
 
     /* Set to true when ostream_tail was read to EOF */
     bool hit_eof;
+
+    apr_pool_t *pool;
 } serf_pump_t;
 
 
@@ -769,15 +776,23 @@ void serf_pump__init(serf_pump_t *pump,
                      serf_bucket_alloc_t *allocator,
                      apr_pool_t *pool);
 
+void serf_pump__done(serf_pump_t *pump);
+
 bool serf_pump__data_pending(serf_pump_t *pump);
 void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump);
 
 apr_status_t serf_pump__write(serf_pump_t *pump,
                               bool fetch_new);
 
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+                                   serf_bucket_t *bucket,
+                                   bool flush);
+
 /* These must always be called as a pair to avoid a memory leak */
 void serf_pump__prepare_setup(serf_pump_t *pump);
-void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream);
+void serf_pump__complete_setup(serf_pump_t *pump,
+                               serf_bucket_t *stream,
+                               serf_bucket_t *ostream);
 
 
 /** Logging functions. **/

Modified: serf/branches/pump-investigate/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/branches/pump-investigate/test/test_server.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/test/test_server.c (original)
+++ serf/branches/pump-investigate/test/test_server.c Mon Nov 23 16:41:12 2015
@@ -247,8 +247,8 @@ CuSuite *test_server(void)
 
     CuSuiteSetSetupTeardownCallbacks(suite, test_setup, test_teardown);
 
-    SUITE_ADD_TEST(suite, test_listen_http);
-    SUITE_ADD_TEST(suite, test_listen_http2);
+    /*SUITE_ADD_TEST(suite, test_listen_http);
+    SUITE_ADD_TEST(suite, test_listen_http2);*/
 
     return suite;
 }


Reply via email to