This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 962da1e  DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers
962da1e is described below

commit 962da1ede69f36ec5192a1e1c25e8f59b3c75949
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Wed Jun 9 12:08:49 2021 -0400

    DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers
    
    This closes #1263
---
 src/adaptors/http1/http1_adaptor.c | 261 ++++++++++++++++---------------------
 src/adaptors/http1/http1_client.c  |  63 ++++-----
 src/adaptors/http1/http1_private.h |  47 +++----
 src/adaptors/http1/http1_server.c  |  55 ++++----
 4 files changed, 190 insertions(+), 236 deletions(-)

diff --git a/src/adaptors/http1/http1_adaptor.c 
b/src/adaptors/http1/http1_adaptor.c
index 5874ec9..bd3c189 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -138,45 +138,31 @@ void qdr_http1_connection_free(qdr_http1_connection_t 
*hconn)
     }
 }
 
-
-void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
+static void _free_qdr_http1_out_data(qdr_http1_out_data_t *od)
 {
-    if (out_data) {
-        // expect: all buffers returned from proton!
-        // FIXME: not during router shutdown!
-        // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
-        qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
-        while (od) {
-            DEQ_REMOVE_HEAD(out_data->fifo);
-            if (od->stream_data)
-                qd_message_stream_data_release(od->stream_data);
-            else
-                qd_buffer_list_free_buffers(&od->raw_buffers);
-            free_qdr_http1_out_data_t(od);
-            od = DEQ_HEAD(out_data->fifo);
-        }
+    if (od) {
+        qd_iterator_free(od->data_iter);
+        if (od->stream_data)
+            qd_message_stream_data_release(od->stream_data);
+        else
+            qd_buffer_list_free_buffers(&od->raw_buffers);
+        free_qdr_http1_out_data_t(od);
     }
 }
 
-
-// Return the number of buffers in the process of being written out by the 
proactor.
-// These buffers are "owned" by proton - they must not be freed until proton 
has
-// released them.
-//
-int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t 
*out_data)
+void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data)
 {
-    int count = 0;
     if (out_data) {
-        qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
+        // expect: all buffers returned from proton!
+        // FIXME: not during router shutdown!
+        // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0);
+        qdr_http1_out_data_t *od = DEQ_HEAD(*out_data);
         while (od) {
-            count += od->next_buffer - od->free_count;
-            if (od == out_data->write_ptr)
-                break;
-
-            od = DEQ_NEXT(od);
+            DEQ_REMOVE_HEAD(*out_data);
+            _free_qdr_http1_out_data(od);
+            od = DEQ_HEAD(*out_data);
         }
     }
-    return count;
 }
 
 
@@ -250,86 +236,51 @@ void qdr_http1_error_response(qdr_http1_request_base_t 
*hreq,
 //
 
 
-// Write pending data out the raw connection.  Preserve order by only writing
-// the head request data.
+// Write list of data out the raw connection, freeing entries when data is 
exhausted
 //
-uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, 
qdr_http1_out_data_fifo_t *fifo)
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, 
qdr_http1_out_data_list_t *fifo)
 {
-    pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
     size_t count = !hconn->raw_conn || 
pn_raw_connection_is_write_closed(hconn->raw_conn)
         ? 0
         : pn_raw_connection_write_buffers_capacity(hconn->raw_conn);
 
-    uint64_t total_octets = 0;
-    qdr_http1_out_data_t *od = fifo->write_ptr;
-    while (count > 0 && od) {
-        qd_buffer_t *wbuf   = 0;
-        int          od_len = MIN(count,
-                                  (od->buffer_count - od->next_buffer));
-        assert(od_len);  // error: no data @ head?
-
-        // send the out_data as a series of writes to proactor
-
-        while (od_len) {
-            size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
-            int written = 0;
-
-            if (od->stream_data) {  // buffers stored in qd_message_t
-
-                written = qd_message_stream_data_buffers(od->stream_data, 
buffers, od->next_buffer, limit);
-                for (int i = 0; i < written; ++i) {
-                    // enforce this: we expect the context can be used by the 
adaptor!
-                    assert(buffers[i].context == 0);
-                    buffers[i].context = (uintptr_t)od;
-                    total_octets += buffers[i].size;
-                }
-
-            } else {   // list of buffers in od->raw_buffers
-                // advance to next buffer to send in od
-                if (!wbuf) {
-                    wbuf = DEQ_HEAD(od->raw_buffers);
-                    for (int i = 0; i < od->next_buffer; ++i)
-                        wbuf = DEQ_NEXT(wbuf);
-                }
-
-                pn_raw_buffer_t *rdisc = &buffers[0];
-                while (limit--) {
-                    rdisc->context  = (uintptr_t)od;
-                    rdisc->bytes    = (char*) qd_buffer_base(wbuf);
-                    rdisc->capacity = 0;
-                    rdisc->size     = qd_buffer_size(wbuf);
-                    rdisc->offset   = 0;
-
-                    total_octets += rdisc->size;
-                    ++rdisc;
-                    wbuf = DEQ_NEXT(wbuf);
-                    written += 1;
-                }
-            }
-
-            // keep me, you'll need it
-            if (HTTP1_DUMP_BUFFERS) {
-                for (size_t j = 0; j < written; ++j) {
-                    char *ptr = (char*) buffers[j].bytes;
-                    int len = (int) buffers[j].size;
-                    fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%d\n 
 value='%.*s'\n",
-                            hconn->conn_id, (void*)ptr, len, len, ptr);
-                    fflush(stdout);
-                }
-            }
-
-            written = pn_raw_connection_write_buffers(hconn->raw_conn, 
buffers, written);
-            count -= written;
-            od_len -= written;
-            od->next_buffer += written;
+    if (hconn->write_buf_busy || count == 0)
+        return 0;
+
+    const size_t max_octets = HTTP1_IO_BUF_SIZE;
+    size_t total_octets = 0;
+    while (!DEQ_IS_EMPTY(*fifo) && total_octets < max_octets) {
+        qdr_http1_out_data_t *od = DEQ_HEAD(*fifo);
+        uint32_t data_octets = qd_iterator_remaining(od->data_iter);
+
+        size_t len = MIN(data_octets, max_octets - total_octets);
+        int copied = qd_iterator_ncopy(od->data_iter, 
&hconn->write_buffer[total_octets], len);
+        assert(copied == len);
+        data_octets -= copied;
+        total_octets += copied;
+
+        qd_iterator_trim_view(od->data_iter, data_octets);
+        if (qd_iterator_remaining(od->data_iter) == 0) {
+            DEQ_REMOVE_HEAD(*fifo);
+            _free_qdr_http1_out_data(od);
         }
+    }
 
-        if (od->next_buffer == od->buffer_count) {
-            // all buffers in od have been passed to proton.
-            od = DEQ_NEXT(od);
-            fifo->write_ptr = od;
-            wbuf = 0;
+    if (total_octets) {
+        pn_raw_buffer_t pn_buff = {0};
+        pn_buff.bytes = (char*) hconn->write_buffer;
+        pn_buff.size  = total_octets;
+
+        // keep me, you'll need it
+        if (HTTP1_DUMP_BUFFERS) {
+            fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%"PRIu32"\n  
value='%.*s'\n",
+                    hconn->conn_id, (void*)pn_buff.bytes, pn_buff.size,
+                    (int) pn_buff.size, pn_buff.bytes);
+            fflush(stdout);
         }
+
+        pn_raw_connection_write_buffers(hconn->raw_conn, &pn_buff, 1);
+        hconn->write_buf_busy = true;
     }
 
     hconn->out_http1_octets += total_octets;
@@ -340,20 +291,15 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t 
*hconn, qdr_http1_out_d
 // The HTTP encoder has a list of buffers to be written to the raw connection.
 // Queue it to the outgoing data fifo.
 //
-void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, 
qd_buffer_list_t *blist)
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, 
qd_buffer_list_t *blist, uintmax_t octets)
 {
-    int count = (int) DEQ_SIZE(*blist);
-    if (count) {
+    if (octets) {
         qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
         ZERO(od);
-        od->owning_fifo = fifo;
-        od->buffer_count = (int) DEQ_SIZE(*blist);
         od->raw_buffers = *blist;
+        od->data_iter = qd_iterator_buffer(DEQ_HEAD(od->raw_buffers), 0, 
(int)octets, ITER_VIEW_ALL);
         DEQ_INIT(*blist);
-
-        DEQ_INSERT_TAIL(fifo->fifo, od);
-        if (!fifo->write_ptr)
-            fifo->write_ptr = od;
+        DEQ_INSERT_TAIL(*fifo, od);
     }
 }
 
@@ -361,19 +307,14 @@ void 
qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li
 // The HTTP encoder has a message body data to be written to the raw 
connection.
 // Queue it to the outgoing data fifo.
 //
-void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, 
qd_message_stream_data_t *stream_data)
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, 
qd_message_stream_data_t *stream_data)
 {
-    int count = qd_message_stream_data_buffer_count(stream_data);
-    if (count) {
+    if (qd_message_stream_data_payload_length(stream_data)) {
         qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
         ZERO(od);
-        od->owning_fifo = fifo;
         od->stream_data = stream_data;
-        od->buffer_count = count;
-
-        DEQ_INSERT_TAIL(fifo->fifo, od);
-        if (!fifo->write_ptr)
-            fifo->write_ptr = od;
+        od->data_iter = qd_message_stream_data_iterator(stream_data);
+        DEQ_INSERT_TAIL(*fifo, od);
     } else {
         // empty body-data
         qd_message_stream_data_release(stream_data);
@@ -385,39 +326,61 @@ void 
qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_s
 //
 void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
 {
-    pn_raw_buffer_t buffers[RAW_BUFFER_BATCH];
-    size_t count;
-    while ((count = pn_raw_connection_take_written_buffers(hconn->raw_conn, 
buffers, RAW_BUFFER_BATCH)) != 0) {
-        for (size_t i = 0; i < count; ++i) {
-
-            // keep me, you'll need it
-            if (HTTP1_DUMP_BUFFERS) {
-                char *ptr = (char*) buffers[i].bytes;
-                int len = (int) buffers[i].size;
-                fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d 
c=%d o=%d\n  value='%.*s'\n",
-                        hconn->conn_id, (void*)ptr, len, buffers[i].capacity, 
buffers[i].offset, len, ptr);
-                fflush(stdout);
-            }
+    pn_raw_buffer_t pn_buff = {0};
+
+    if (pn_raw_connection_take_written_buffers(hconn->raw_conn, &pn_buff, 1) 
!= 0) {
+        assert(hconn->write_buf_busy);  // expect write buffer in use
+
+        // keep me, you'll need it
+        if (HTTP1_DUMP_BUFFERS) {
+            char *ptr = (char*) pn_buff.bytes;
+            int len = (int) pn_buff.size;
+            fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d 
o=%d\n  value='%.*s'\n",
+                    hconn->conn_id, (void*)ptr, len, pn_buff.capacity, 
pn_buff.offset, len, ptr);
+            fflush(stdout);
+        }
 
-            qdr_http1_out_data_t *od = (qdr_http1_out_data_t*) 
buffers[i].context;
-            assert(od);
-            // Note: according to proton devs the order in which write buffers
-            // are released are NOT guaranteed to be in the same order in which
-            // they were written!
-
-            od->free_count += 1;
-            if (od->free_count == od->buffer_count) {
-                // all buffers returned
-                qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
-                DEQ_REMOVE(fifo->fifo, od);
-                if (od->stream_data)
-                    qd_message_stream_data_release(od->stream_data);
-                else
-                    qd_buffer_list_free_buffers(&od->raw_buffers);
-                free_qdr_http1_out_data_t(od);
-            }
+        hconn->write_buf_busy = false;
+    }
+}
+
+
+//
+// Raw Connection Read Buffer Management
+//
+
+int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn)
+{
+    if (!hconn->read_buf_busy && hconn->raw_conn
+        && pn_raw_connection_read_buffers_capacity(hconn->raw_conn) > 0) {
+
+        pn_raw_buffer_t pn_buf = {0};
+        pn_buf.bytes = (char*) hconn->read_buffer;
+        pn_buf.capacity = HTTP1_IO_BUF_SIZE;
+        pn_raw_connection_give_read_buffers(hconn->raw_conn, &pn_buf, 1);
+        hconn->read_buf_busy = true;
+        return 1;
+    }
+    return 0;
+}
+
+// take incoming data from raw connection
+uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn,
+                                     qd_buffer_list_t *blist)
+{
+    pn_raw_buffer_t pn_buff;
+    DEQ_INIT(*blist);
+    uintmax_t octets = 0;
+
+    if (hconn->raw_conn && pn_raw_connection_take_read_buffers(hconn->raw_conn,
+                                                               &pn_buff, 1)) {
+        if (pn_buff.size) {
+            octets = pn_buff.size;
+            qd_buffer_list_append(blist, (uint8_t*) pn_buff.bytes, 
pn_buff.size);
         }
+        hconn->read_buf_busy = false;
     }
+    return octets;
 }
 
 
diff --git a/src/adaptors/http1/http1_client.c 
b/src/adaptors/http1/http1_client.c
index 204f7a7..72a31da 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -50,7 +50,7 @@ typedef struct _client_response_msg_t {
     bool            encoded;          // true when full response encoded
 
     // HTTP encoded message data
-    qdr_http1_out_data_fifo_t out_data;
+    qdr_http1_out_data_list_t out_data;
 
 } _client_response_msg_t;
 ALLOC_DECLARE(_client_response_msg_t);
@@ -383,13 +383,13 @@ static int _handle_conn_read_event(qdr_http1_connection_t 
*hconn)
 {
     int error = 0;
     qd_buffer_list_t blist;
-    uintmax_t length;
-    qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
+    uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist);
+
     if (length) {
+
         qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client 
(%zu buffers)",
                hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
-        hconn->in_http1_octets += length;
 
         if (HTTP1_DUMP_BUFFERS) {
             fprintf(stdout, "\nClient raw buffer READ %"PRIuMAX" total 
octets\n", length);
@@ -401,6 +401,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t 
*hconn)
             fflush(stdout);
         }
 
+        hconn->in_http1_octets += length;
         error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
     }
     return error;
@@ -412,7 +413,7 @@ static void 
_handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
 {
     // @TODO(kgiusti): backpressure if no credit
     if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && 
hconn->in_link_credit > 0 */) {
-        int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+        int granted = qdr_http1_grant_read_buffers(hconn);
         qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read 
buffers granted",
                hconn->conn_id, granted);
     }
@@ -481,11 +482,26 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         _write_pending_response((_client_request_t*) 
DEQ_HEAD(hconn->requests));
         break;
     }
+    case PN_RAW_CONNECTION_WRITTEN: {
+        qdr_http1_free_written_buffers(hconn);
+        break;
+    }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", 
hconn->conn_id);
         _handle_conn_need_read_buffers(hconn);
         break;
     }
+    case PN_RAW_CONNECTION_READ: {
+        if (!hconn->q2_blocked) {
+            int error = _handle_conn_read_event(hconn);
+            if (error)
+                qdr_http1_close_connection(hconn, "Incoming response message 
failed to parse");
+            else
+                // room for more incoming data
+                _handle_conn_need_read_buffers(hconn);
+        }
+        break;
+    }
     case PN_RAW_CONNECTION_WAKE: {
         int error = 0;
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id);
@@ -495,7 +511,9 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
             qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from 
Q2 limit", hconn->conn_id);
             hconn->q2_blocked = false;
             error = _handle_conn_read_event(hconn);  // restart receiving
-            _handle_conn_need_read_buffers(hconn);
+            if (!error)
+                // room for more incoming data
+                _handle_conn_need_read_buffers(hconn);
         }
 
         while (qdr_connection_process(hconn->qdr_conn)) {}
@@ -506,18 +524,6 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", 
hconn->conn_id);
         break;
     }
-    case PN_RAW_CONNECTION_READ: {
-        if (!hconn->q2_blocked) {
-            int error = _handle_conn_read_event(hconn);
-            if (error)
-                qdr_http1_close_connection(hconn, "Incoming response message 
failed to parse");
-        }
-        break;
-    }
-    case PN_RAW_CONNECTION_WRITTEN: {
-        qdr_http1_free_written_buffers(hconn);
-        break;
-    }
     default:
         break;
     }
@@ -556,7 +562,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
                        hconn->out_link_id, hreq->error_code, hreq->error_text);
                 _client_response_msg_t *rmsg = new__client_response_msg_t();
                 ZERO(rmsg);
-                DEQ_INIT(rmsg->out_data.fifo);
+                DEQ_INIT(rmsg->out_data);
                 DEQ_INSERT_TAIL(hreq->responses, rmsg);
                 qdr_http1_error_response(&hreq->base, hreq->error_code, 
hreq->error_text);
                 _write_pending_response(hreq);
@@ -566,7 +572,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
             _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
             while (rmsg &&
                    rmsg->dispo &&
-                   DEQ_IS_EMPTY(rmsg->out_data.fifo) &&
+                   DEQ_IS_EMPTY(rmsg->out_data) &&
                    hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
                 // response message fully received and forwarded to client
                 if (rmsg->dlv) {
@@ -660,7 +666,7 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t 
*hrs, qd_buffer_list_
         rmsg = DEQ_HEAD(hreq->responses);
     }
     assert(rmsg);
-    qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
+    qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist, len);
 
     // if this happens to be the current outgoing response try writing to the
     // raw connection
@@ -1033,12 +1039,7 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t 
   *adaptor,
     hconn->in_link_credit += credit;
     if (hconn->in_link_credit > 0) {
 
-        if (hconn->raw_conn) {
-            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
-            qd_log(adaptor->log, QD_LOG_DEBUG,
-                   "[C%"PRIu64"] %d read buffers granted",
-                   hconn->conn_id, granted);
-        }
+        _handle_conn_need_read_buffers(hconn);
 
         // is the current request message blocked by lack of credit?
 
@@ -1327,7 +1328,7 @@ void 
qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
                 // if nothing has been sent back so far
                 _client_response_msg_t *rmsg = new__client_response_msg_t();
                 ZERO(rmsg);
-                DEQ_INIT(rmsg->out_data.fifo);
+                DEQ_INIT(rmsg->out_data);
                 DEQ_INSERT_TAIL(hreq->responses, rmsg);
 
                 if (disp == PN_REJECTED) {
@@ -1600,7 +1601,7 @@ uint64_t 
qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
             _client_response_msg_t *rmsg = new__client_response_msg_t();
             ZERO(rmsg);
             rmsg->dlv = delivery;
-            DEQ_INIT(rmsg->out_data.fifo);
+            DEQ_INIT(rmsg->out_data);
             qdr_delivery_set_context(delivery, hreq);
             qdr_delivery_incref(delivery, "HTTP1 client referencing response 
delivery");
             DEQ_INSERT_TAIL(hreq->responses, rmsg);
@@ -1678,7 +1679,7 @@ uint64_t 
qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
 static void _client_response_msg_free(_client_request_t *req, 
_client_response_msg_t *rmsg)
 {
     DEQ_REMOVE(req->responses, rmsg);
-    qdr_http1_out_data_fifo_cleanup(&rmsg->out_data);
+    qdr_http1_out_data_cleanup(&rmsg->out_data);
 
     if (rmsg->dlv) {
         qdr_delivery_set_context(rmsg->dlv, 0);
@@ -1697,7 +1698,7 @@ static void _write_pending_response(_client_request_t 
*hreq)
     if (hreq && !hreq->cancelled) {
         assert(DEQ_PREV(&hreq->base) == 0);  // must preserve order
         _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
-        if (rmsg && rmsg->out_data.write_ptr) {
+        if (rmsg && DEQ_HEAD(rmsg->out_data)) {
             uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, 
&rmsg->out_data);
             hreq->base.out_http1_octets += written;
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] 
%"PRIu64" octets written",
diff --git a/src/adaptors/http1/http1_private.h 
b/src/adaptors/http1/http1_private.h
index f654fc5..19ffa0c 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -38,8 +38,9 @@
 // for debug: will dump I/O buffer content to stdout if true
 #define HTTP1_DUMP_BUFFERS false
 
+#define HTTP1_IO_BUF_SIZE 16384
+
 typedef struct qdr_http1_out_data_t      qdr_http1_out_data_t;
-typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
 typedef struct qdr_http1_request_base_t  qdr_http1_request_base_t;
 typedef struct qdr_http1_connection_t    qdr_http1_connection_t;
 
@@ -63,7 +64,7 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
 // This adaptor has to cope with two different data sources: the HTTP1 encoder
 // and the qd_message_stream_data_t list.  The HTTP1 encoder produces a simple
 // qd_buffer_list_t for outgoing header data whose ownership is given to the
-// adaptor: the adaptor is free to deque/free these buffers as needed.  The
+// adaptor: the adaptor is free to dequeue/free these buffers as needed.  The
 // qd_message_stream_data_t buffers are shared with the owning message and the
 // buffer list must not be modified by the adaptor.  The qdr_http1_out_data_t
 // is used to manage both types of data sources.
@@ -71,36 +72,19 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
 struct qdr_http1_out_data_t {
     DEQ_LINKS(qdr_http1_out_data_t);
 
-    qdr_http1_out_data_fifo_t *owning_fifo;
-
     // data is either in a raw buffer chain
     // or a message body data (not both!)
 
     qd_buffer_list_t raw_buffers;
     qd_message_stream_data_t *stream_data;
 
-    int buffer_count;  // # total buffers
-    int next_buffer;   // offset to next buffer to send
-    int free_count;    // # buffers returned from proton
+    // points to the data contained in the stream_data/raw_buffers
+    qd_iterator_t *data_iter;
 };
 ALLOC_DECLARE(qdr_http1_out_data_t);
 DEQ_DECLARE(qdr_http1_out_data_t, qdr_http1_out_data_list_t);
 
 
-//
-// A fifo of outgoing (raw connection) data, oldest at HEAD.
-//
-// write_ptr tracks the point in the fifo where the current out_data node that
-// is being written to the raw connection.  As the raw connection returns
-// written buffers (PN_RAW_CONNECTION_WRITTEN) the are removed from the HEAD
-// and freed.
-//
-struct qdr_http1_out_data_fifo_t {
-    qdr_http1_out_data_list_t fifo;
-    qdr_http1_out_data_t     *write_ptr;
-};
-
-
 // Per HTTP request/response(s) state.
 //
 // This base class is extended for client and server-specific state, see
@@ -188,6 +172,14 @@ struct qdr_http1_connection_t {
     // flags
     //
     bool trace;
+
+    //
+    //
+    bool read_buf_busy;
+    bool write_buf_busy;
+
+    uint8_t read_buffer[HTTP1_IO_BUF_SIZE];
+    uint8_t write_buffer[HTTP1_IO_BUF_SIZE];
 };
 ALLOC_DECLARE(qdr_http1_connection_t);
 
@@ -207,12 +199,13 @@ ALLOC_DECLARE(qdr_http1_connection_t);
 //void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t 
*blist);
 
 void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
-void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, 
qd_buffer_list_t *blist);
-void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, 
qd_message_stream_data_t *stream_data);
-uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, 
qdr_http1_out_data_fifo_t *fifo);
-void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
-// return the number of buffers currently held by the proactor for writing
-int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t 
*out_data);
+void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, 
qd_buffer_list_t *blist, uintmax_t octets);
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, 
qd_message_stream_data_t *stream_data);
+uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, 
qdr_http1_out_data_list_t *fifo);
+void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data);
+int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn);
+uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn,
+                                     qd_buffer_list_t *blist);
 
 void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char 
*error);
 void qdr_http1_connection_free(qdr_http1_connection_t *hconn);
diff --git a/src/adaptors/http1/http1_server.c 
b/src/adaptors/http1/http1_server.c
index bcdb3e5..bb6d252 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -74,7 +74,8 @@ typedef struct _server_request_t {
     bool            request_discard; // drop incoming request data
     bool            headers_encoded; // True when header encode done
 
-    qdr_http1_out_data_fifo_t out_data;  // encoded request written to raw conn
+    // fifo of encoded request data to be written out the raw connection:
+    qdr_http1_out_data_list_t out_data;
 
     _server_response_msg_list_t responses;  // response(s) to this request
 
@@ -476,24 +477,25 @@ static int _handle_conn_read_event(qdr_http1_connection_t 
*hconn)
 {
     int              error = 0;
     qd_buffer_list_t blist;
-    uintmax_t        length;
 
-    qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length);
-
-    if (HTTP1_DUMP_BUFFERS) {
-        fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", 
length);
-        qd_buffer_t *bb = DEQ_HEAD(blist);
-        while (bb) {
-            fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), 
(char*)&bb[1]);
-            bb = DEQ_NEXT(bb);
-        }
-        fflush(stdout);
-    }
+    uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist);
 
     if (length) {
+
         qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
                "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server 
(%zu buffers)",
                hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist));
+
+        if (HTTP1_DUMP_BUFFERS) {
+            fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total 
octets\n", length);
+            qd_buffer_t *bb = DEQ_HEAD(blist);
+            while (bb) {
+                fprintf(stdout, "  buffer='%.*s'\n", (int)qd_buffer_size(bb), 
(char*)&bb[1]);
+                bb = DEQ_NEXT(bb);
+            }
+            fflush(stdout);
+        }
+
         hconn->in_http1_octets += length;
         error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
     }
@@ -506,7 +508,7 @@ static void 
_handle_conn_need_read_buffers(qdr_http1_connection_t *hconn)
 {
     // @TODO(kgiusti): backpressure if no credit
     // if (hconn->in_link_credit > 0 */)
-    int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+    int granted = qdr_http1_grant_read_buffers(hconn);
     qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers 
granted",
            hconn->conn_id, granted);
 }
@@ -549,7 +551,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         _server_request_t *hreq = (_server_request_t*) 
DEQ_HEAD(hconn->requests);
         if (_is_request_in_progress(hreq)) {
             hreq->request_discard = true;
-            qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+            qdr_http1_out_data_cleanup(&hreq->out_data);
         }
         pn_raw_connection_close(hconn->raw_conn);
         break;
@@ -608,8 +610,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         break;
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
-        if (!hconn->q2_blocked)
-            _handle_conn_need_read_buffers(hconn);
+        _handle_conn_need_read_buffers(hconn);
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
@@ -621,7 +622,9 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
             qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from 
Q2 limit", hconn->conn_id);
             hconn->q2_blocked = false;
             error = _handle_conn_read_event(hconn);  // restart receiving
-            _handle_conn_need_read_buffers(hconn);
+            if (!error)
+                // room for more incoming data
+                _handle_conn_need_read_buffers(hconn);
         }
 
         while (qdr_connection_process(hconn->qdr_conn)) {}
@@ -680,11 +683,6 @@ static bool _process_request(_server_request_t *hreq)
 
     if (hreq->cancelled) {
 
-        // have to wait until all buffers returned from proton
-        // before we can release the request
-        if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
-            return false;
-
         // clean up the request message delivery
         if (hreq->request_dlv) {
 
@@ -772,7 +770,7 @@ static bool _process_request(_server_request_t *hreq)
             }
         }
 
-        if (hreq->request_acked && hreq->request_settled && 
DEQ_SIZE(hreq->out_data.fifo) == 0) {
+        if (hreq->request_acked && hreq->request_settled && 
DEQ_SIZE(hreq->out_data) == 0) {
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP 
request msg-id=%"PRIu64" completed!",
                    hconn->conn_id, hreq->base.msg_id);
             _server_request_free(hreq);
@@ -804,7 +802,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t 
*hrs, qd_buffer_list_
         qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
                "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
                hconn->conn_id, hconn->out_link_id, len);
-        qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
+        qdr_http1_enqueue_buffer_list(&hreq->out_data, blist, len);
     }
 }
 
@@ -1150,8 +1148,7 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t  
  *adaptor,
 
     if (hconn->in_link_credit > 0) {
 
-        if (hconn->raw_conn && !hconn->q2_blocked)
-            qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+        _handle_conn_need_read_buffers(hconn);
 
         // check for pending responses that are blocked for credit
 
@@ -1293,7 +1290,7 @@ static _server_request_t 
*_create_request_context(qdr_http1_connection_t *hconn,
     hreq->base.response_addr = reply_to;
     hreq->base.site = group_id;
     hreq->base.start = qd_timer_now();
-    DEQ_INIT(hreq->out_data.fifo);
+    DEQ_INIT(hreq->out_data);
     DEQ_INIT(hreq->responses);
     DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
 
@@ -1644,7 +1641,7 @@ static void _server_request_free(_server_request_t *hreq)
 {
     if (hreq) {
         qdr_http1_request_base_cleanup(&hreq->base);
-        qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
+        qdr_http1_out_data_cleanup(&hreq->out_data);
         if (hreq->request_dlv) {
             qdr_delivery_set_context(hreq->request_dlv, 0);
             qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, 
"HTTP1 server releasing request delivery");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to