This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 962da1e DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers 962da1e is described below commit 962da1ede69f36ec5192a1e1c25e8f59b3c75949 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Wed Jun 9 12:08:49 2021 -0400 DISPATCH-2162: HTTP1/x adaptor - use larger I/O buffers This closes #1263 --- src/adaptors/http1/http1_adaptor.c | 261 ++++++++++++++++--------------------- src/adaptors/http1/http1_client.c | 63 ++++----- src/adaptors/http1/http1_private.h | 47 +++---- src/adaptors/http1/http1_server.c | 55 ++++---- 4 files changed, 190 insertions(+), 236 deletions(-) diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c index 5874ec9..bd3c189 100644 --- a/src/adaptors/http1/http1_adaptor.c +++ b/src/adaptors/http1/http1_adaptor.c @@ -138,45 +138,31 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn) } } - -void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data) +static void _free_qdr_http1_out_data(qdr_http1_out_data_t *od) { - if (out_data) { - // expect: all buffers returned from proton! - // FIXME: not during router shutdown! - // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0); - qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo); - while (od) { - DEQ_REMOVE_HEAD(out_data->fifo); - if (od->stream_data) - qd_message_stream_data_release(od->stream_data); - else - qd_buffer_list_free_buffers(&od->raw_buffers); - free_qdr_http1_out_data_t(od); - od = DEQ_HEAD(out_data->fifo); - } + if (od) { + qd_iterator_free(od->data_iter); + if (od->stream_data) + qd_message_stream_data_release(od->stream_data); + else + qd_buffer_list_free_buffers(&od->raw_buffers); + free_qdr_http1_out_data_t(od); } } - -// Return the number of buffers in the process of being written out by the proactor. -// These buffers are "owned" by proton - they must not be freed until proton has -// released them. -// -int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data) +void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data) { - int count = 0; if (out_data) { - qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo); + // expect: all buffers returned from proton! + // FIXME: not during router shutdown! + // assert(qdr_http1_out_data_buffers_outstanding(out_data) == 0); + qdr_http1_out_data_t *od = DEQ_HEAD(*out_data); while (od) { - count += od->next_buffer - od->free_count; - if (od == out_data->write_ptr) - break; - - od = DEQ_NEXT(od); + DEQ_REMOVE_HEAD(*out_data); + _free_qdr_http1_out_data(od); + od = DEQ_HEAD(*out_data); } } - return count; } @@ -250,86 +236,51 @@ void qdr_http1_error_response(qdr_http1_request_base_t *hreq, // -// Write pending data out the raw connection. Preserve order by only writing -// the head request data. +// Write list of data out the raw connection, freeing entries when data is exhausted // -uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo) +uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_list_t *fifo) { - pn_raw_buffer_t buffers[RAW_BUFFER_BATCH]; size_t count = !hconn->raw_conn || pn_raw_connection_is_write_closed(hconn->raw_conn) ? 0 : pn_raw_connection_write_buffers_capacity(hconn->raw_conn); - uint64_t total_octets = 0; - qdr_http1_out_data_t *od = fifo->write_ptr; - while (count > 0 && od) { - qd_buffer_t *wbuf = 0; - int od_len = MIN(count, - (od->buffer_count - od->next_buffer)); - assert(od_len); // error: no data @ head? - - // send the out_data as a series of writes to proactor - - while (od_len) { - size_t limit = MIN(RAW_BUFFER_BATCH, od_len); - int written = 0; - - if (od->stream_data) { // buffers stored in qd_message_t - - written = qd_message_stream_data_buffers(od->stream_data, buffers, od->next_buffer, limit); - for (int i = 0; i < written; ++i) { - // enforce this: we expect the context can be used by the adaptor! - assert(buffers[i].context == 0); - buffers[i].context = (uintptr_t)od; - total_octets += buffers[i].size; - } - - } else { // list of buffers in od->raw_buffers - // advance to next buffer to send in od - if (!wbuf) { - wbuf = DEQ_HEAD(od->raw_buffers); - for (int i = 0; i < od->next_buffer; ++i) - wbuf = DEQ_NEXT(wbuf); - } - - pn_raw_buffer_t *rdisc = &buffers[0]; - while (limit--) { - rdisc->context = (uintptr_t)od; - rdisc->bytes = (char*) qd_buffer_base(wbuf); - rdisc->capacity = 0; - rdisc->size = qd_buffer_size(wbuf); - rdisc->offset = 0; - - total_octets += rdisc->size; - ++rdisc; - wbuf = DEQ_NEXT(wbuf); - written += 1; - } - } - - // keep me, you'll need it - if (HTTP1_DUMP_BUFFERS) { - for (size_t j = 0; j < written; ++j) { - char *ptr = (char*) buffers[j].bytes; - int len = (int) buffers[j].size; - fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%d\n value='%.*s'\n", - hconn->conn_id, (void*)ptr, len, len, ptr); - fflush(stdout); - } - } - - written = pn_raw_connection_write_buffers(hconn->raw_conn, buffers, written); - count -= written; - od_len -= written; - od->next_buffer += written; + if (hconn->write_buf_busy || count == 0) + return 0; + + const size_t max_octets = HTTP1_IO_BUF_SIZE; + size_t total_octets = 0; + while (!DEQ_IS_EMPTY(*fifo) && total_octets < max_octets) { + qdr_http1_out_data_t *od = DEQ_HEAD(*fifo); + uint32_t data_octets = qd_iterator_remaining(od->data_iter); + + size_t len = MIN(data_octets, max_octets - total_octets); + int copied = qd_iterator_ncopy(od->data_iter, &hconn->write_buffer[total_octets], len); + assert(copied == len); + data_octets -= copied; + total_octets += copied; + + qd_iterator_trim_view(od->data_iter, data_octets); + if (qd_iterator_remaining(od->data_iter) == 0) { + DEQ_REMOVE_HEAD(*fifo); + _free_qdr_http1_out_data(od); } + } - if (od->next_buffer == od->buffer_count) { - // all buffers in od have been passed to proton. - od = DEQ_NEXT(od); - fifo->write_ptr = od; - wbuf = 0; + if (total_octets) { + pn_raw_buffer_t pn_buff = {0}; + pn_buff.bytes = (char*) hconn->write_buffer; + pn_buff.size = total_octets; + + // keep me, you'll need it + if (HTTP1_DUMP_BUFFERS) { + fprintf(stdout, "\n[C%"PRIu64"] Raw Write: Ptr=%p len=%"PRIu32"\n value='%.*s'\n", + hconn->conn_id, (void*)pn_buff.bytes, pn_buff.size, + (int) pn_buff.size, pn_buff.bytes); + fflush(stdout); } + + pn_raw_connection_write_buffers(hconn->raw_conn, &pn_buff, 1); + hconn->write_buf_busy = true; } hconn->out_http1_octets += total_octets; @@ -340,20 +291,15 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_d // The HTTP encoder has a list of buffers to be written to the raw connection. // Queue it to the outgoing data fifo. // -void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist) +void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets) { - int count = (int) DEQ_SIZE(*blist); - if (count) { + if (octets) { qdr_http1_out_data_t *od = new_qdr_http1_out_data_t(); ZERO(od); - od->owning_fifo = fifo; - od->buffer_count = (int) DEQ_SIZE(*blist); od->raw_buffers = *blist; + od->data_iter = qd_iterator_buffer(DEQ_HEAD(od->raw_buffers), 0, (int)octets, ITER_VIEW_ALL); DEQ_INIT(*blist); - - DEQ_INSERT_TAIL(fifo->fifo, od); - if (!fifo->write_ptr) - fifo->write_ptr = od; + DEQ_INSERT_TAIL(*fifo, od); } } @@ -361,19 +307,14 @@ void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li // The HTTP encoder has a message body data to be written to the raw connection. // Queue it to the outgoing data fifo. // -void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data) +void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data) { - int count = qd_message_stream_data_buffer_count(stream_data); - if (count) { + if (qd_message_stream_data_payload_length(stream_data)) { qdr_http1_out_data_t *od = new_qdr_http1_out_data_t(); ZERO(od); - od->owning_fifo = fifo; od->stream_data = stream_data; - od->buffer_count = count; - - DEQ_INSERT_TAIL(fifo->fifo, od); - if (!fifo->write_ptr) - fifo->write_ptr = od; + od->data_iter = qd_message_stream_data_iterator(stream_data); + DEQ_INSERT_TAIL(*fifo, od); } else { // empty body-data qd_message_stream_data_release(stream_data); @@ -385,39 +326,61 @@ void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_s // void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn) { - pn_raw_buffer_t buffers[RAW_BUFFER_BATCH]; - size_t count; - while ((count = pn_raw_connection_take_written_buffers(hconn->raw_conn, buffers, RAW_BUFFER_BATCH)) != 0) { - for (size_t i = 0; i < count; ++i) { - - // keep me, you'll need it - if (HTTP1_DUMP_BUFFERS) { - char *ptr = (char*) buffers[i].bytes; - int len = (int) buffers[i].size; - fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n value='%.*s'\n", - hconn->conn_id, (void*)ptr, len, buffers[i].capacity, buffers[i].offset, len, ptr); - fflush(stdout); - } + pn_raw_buffer_t pn_buff = {0}; + + if (pn_raw_connection_take_written_buffers(hconn->raw_conn, &pn_buff, 1) != 0) { + assert(hconn->write_buf_busy); // expect write buffer in use + + // keep me, you'll need it + if (HTTP1_DUMP_BUFFERS) { + char *ptr = (char*) pn_buff.bytes; + int len = (int) pn_buff.size; + fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n value='%.*s'\n", + hconn->conn_id, (void*)ptr, len, pn_buff.capacity, pn_buff.offset, len, ptr); + fflush(stdout); + } - qdr_http1_out_data_t *od = (qdr_http1_out_data_t*) buffers[i].context; - assert(od); - // Note: according to proton devs the order in which write buffers - // are released are NOT guaranteed to be in the same order in which - // they were written! - - od->free_count += 1; - if (od->free_count == od->buffer_count) { - // all buffers returned - qdr_http1_out_data_fifo_t *fifo = od->owning_fifo; - DEQ_REMOVE(fifo->fifo, od); - if (od->stream_data) - qd_message_stream_data_release(od->stream_data); - else - qd_buffer_list_free_buffers(&od->raw_buffers); - free_qdr_http1_out_data_t(od); - } + hconn->write_buf_busy = false; + } +} + + +// +// Raw Connection Read Buffer Management +// + +int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn) +{ + if (!hconn->read_buf_busy && hconn->raw_conn + && pn_raw_connection_read_buffers_capacity(hconn->raw_conn) > 0) { + + pn_raw_buffer_t pn_buf = {0}; + pn_buf.bytes = (char*) hconn->read_buffer; + pn_buf.capacity = HTTP1_IO_BUF_SIZE; + pn_raw_connection_give_read_buffers(hconn->raw_conn, &pn_buf, 1); + hconn->read_buf_busy = true; + return 1; + } + return 0; +} + +// take incoming data from raw connection +uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn, + qd_buffer_list_t *blist) +{ + pn_raw_buffer_t pn_buff; + DEQ_INIT(*blist); + uintmax_t octets = 0; + + if (hconn->raw_conn && pn_raw_connection_take_read_buffers(hconn->raw_conn, + &pn_buff, 1)) { + if (pn_buff.size) { + octets = pn_buff.size; + qd_buffer_list_append(blist, (uint8_t*) pn_buff.bytes, pn_buff.size); } + hconn->read_buf_busy = false; } + return octets; } diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 204f7a7..72a31da 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -50,7 +50,7 @@ typedef struct _client_response_msg_t { bool encoded; // true when full response encoded // HTTP encoded message data - qdr_http1_out_data_fifo_t out_data; + qdr_http1_out_data_list_t out_data; } _client_response_msg_t; ALLOC_DECLARE(_client_response_msg_t); @@ -383,13 +383,13 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn) { int error = 0; qd_buffer_list_t blist; - uintmax_t length; - qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); + uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist); + if (length) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from client (%zu buffers)", hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist)); - hconn->in_http1_octets += length; if (HTTP1_DUMP_BUFFERS) { fprintf(stdout, "\nClient raw buffer READ %"PRIuMAX" total octets\n", length); @@ -401,6 +401,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn) fflush(stdout); } + hconn->in_http1_octets += length; error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); } return error; @@ -412,7 +413,7 @@ static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn) { // @TODO(kgiusti): backpressure if no credit if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) { - int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); + int granted = qdr_http1_grant_read_buffers(hconn); qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", hconn->conn_id, granted); } @@ -481,11 +482,26 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi _write_pending_response((_client_request_t*) DEQ_HEAD(hconn->requests)); break; } + case PN_RAW_CONNECTION_WRITTEN: { + qdr_http1_free_written_buffers(hconn); + break; + } case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id); _handle_conn_need_read_buffers(hconn); break; } + case PN_RAW_CONNECTION_READ: { + if (!hconn->q2_blocked) { + int error = _handle_conn_read_event(hconn); + if (error) + qdr_http1_close_connection(hconn, "Incoming response message failed to parse"); + else + // room for more incoming data + _handle_conn_need_read_buffers(hconn); + } + break; + } case PN_RAW_CONNECTION_WAKE: { int error = 0; qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Wake-up", hconn->conn_id); @@ -495,7 +511,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", hconn->conn_id); hconn->q2_blocked = false; error = _handle_conn_read_event(hconn); // restart receiving - _handle_conn_need_read_buffers(hconn); + if (!error) + // room for more incoming data + _handle_conn_need_read_buffers(hconn); } while (qdr_connection_process(hconn->qdr_conn)) {} @@ -506,18 +524,6 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Processing done", hconn->conn_id); break; } - case PN_RAW_CONNECTION_READ: { - if (!hconn->q2_blocked) { - int error = _handle_conn_read_event(hconn); - if (error) - qdr_http1_close_connection(hconn, "Incoming response message failed to parse"); - } - break; - } - case PN_RAW_CONNECTION_WRITTEN: { - qdr_http1_free_written_buffers(hconn); - break; - } default: break; } @@ -556,7 +562,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi hconn->out_link_id, hreq->error_code, hreq->error_text); _client_response_msg_t *rmsg = new__client_response_msg_t(); ZERO(rmsg); - DEQ_INIT(rmsg->out_data.fifo); + DEQ_INIT(rmsg->out_data); DEQ_INSERT_TAIL(hreq->responses, rmsg); qdr_http1_error_response(&hreq->base, hreq->error_code, hreq->error_text); _write_pending_response(hreq); @@ -566,7 +572,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); while (rmsg && rmsg->dispo && - DEQ_IS_EMPTY(rmsg->out_data.fifo) && + DEQ_IS_EMPTY(rmsg->out_data) && hconn->cfg.aggregation == QD_AGGREGATION_NONE) { // response message fully received and forwarded to client if (rmsg->dlv) { @@ -660,7 +666,7 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ rmsg = DEQ_HEAD(hreq->responses); } assert(rmsg); - qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist); + qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist, len); // if this happens to be the current outgoing response try writing to the // raw connection @@ -1033,12 +1039,7 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor, hconn->in_link_credit += credit; if (hconn->in_link_credit > 0) { - if (hconn->raw_conn) { - int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); - qd_log(adaptor->log, QD_LOG_DEBUG, - "[C%"PRIu64"] %d read buffers granted", - hconn->conn_id, granted); - } + _handle_conn_need_read_buffers(hconn); // is the current request message blocked by lack of credit? @@ -1327,7 +1328,7 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor, // if nothing has been sent back so far _client_response_msg_t *rmsg = new__client_response_msg_t(); ZERO(rmsg); - DEQ_INIT(rmsg->out_data.fifo); + DEQ_INIT(rmsg->out_data); DEQ_INSERT_TAIL(hreq->responses, rmsg); if (disp == PN_REJECTED) { @@ -1600,7 +1601,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor, _client_response_msg_t *rmsg = new__client_response_msg_t(); ZERO(rmsg); rmsg->dlv = delivery; - DEQ_INIT(rmsg->out_data.fifo); + DEQ_INIT(rmsg->out_data); qdr_delivery_set_context(delivery, hreq); qdr_delivery_incref(delivery, "HTTP1 client referencing response delivery"); DEQ_INSERT_TAIL(hreq->responses, rmsg); @@ -1678,7 +1679,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor, static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg) { DEQ_REMOVE(req->responses, rmsg); - qdr_http1_out_data_fifo_cleanup(&rmsg->out_data); + qdr_http1_out_data_cleanup(&rmsg->out_data); if (rmsg->dlv) { qdr_delivery_set_context(rmsg->dlv, 0); @@ -1697,7 +1698,7 @@ static void _write_pending_response(_client_request_t *hreq) if (hreq && !hreq->cancelled) { assert(DEQ_PREV(&hreq->base) == 0); // must preserve order _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); - if (rmsg && rmsg->out_data.write_ptr) { + if (rmsg && DEQ_HEAD(rmsg->out_data)) { uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data); hreq->base.out_http1_octets += written; qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written", diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index f654fc5..19ffa0c 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -38,8 +38,9 @@ // for debug: will dump I/O buffer content to stdout if true #define HTTP1_DUMP_BUFFERS false +#define HTTP1_IO_BUF_SIZE 16384 + typedef struct qdr_http1_out_data_t qdr_http1_out_data_t; -typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t; typedef struct qdr_http1_request_base_t qdr_http1_request_base_t; typedef struct qdr_http1_connection_t qdr_http1_connection_t; @@ -63,7 +64,7 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor; // This adaptor has to cope with two different data sources: the HTTP1 encoder // and the qd_message_stream_data_t list. The HTTP1 encoder produces a simple // qd_buffer_list_t for outgoing header data whose ownership is given to the -// adaptor: the adaptor is free to deque/free these buffers as needed. The +// adaptor: the adaptor is free to dequeue/free these buffers as needed. The // qd_message_stream_data_t buffers are shared with the owning message and the // buffer list must not be modified by the adaptor. The qdr_http1_out_data_t // is used to manage both types of data sources. @@ -71,36 +72,19 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor; struct qdr_http1_out_data_t { DEQ_LINKS(qdr_http1_out_data_t); - qdr_http1_out_data_fifo_t *owning_fifo; - // data is either in a raw buffer chain // or a message body data (not both!) qd_buffer_list_t raw_buffers; qd_message_stream_data_t *stream_data; - int buffer_count; // # total buffers - int next_buffer; // offset to next buffer to send - int free_count; // # buffers returned from proton + // points to the data contained in the stream_data/raw_buffers + qd_iterator_t *data_iter; }; ALLOC_DECLARE(qdr_http1_out_data_t); DEQ_DECLARE(qdr_http1_out_data_t, qdr_http1_out_data_list_t); -// -// A fifo of outgoing (raw connection) data, oldest at HEAD. -// -// write_ptr tracks the point in the fifo where the current out_data node that -// is being written to the raw connection. As the raw connection returns -// written buffers (PN_RAW_CONNECTION_WRITTEN) the are removed from the HEAD -// and freed. -// -struct qdr_http1_out_data_fifo_t { - qdr_http1_out_data_list_t fifo; - qdr_http1_out_data_t *write_ptr; -}; - - // Per HTTP request/response(s) state. // // This base class is extended for client and server-specific state, see @@ -188,6 +172,14 @@ struct qdr_http1_connection_t { // flags // bool trace; + + // + // + bool read_buf_busy; + bool write_buf_busy; + + uint8_t read_buffer[HTTP1_IO_BUF_SIZE]; + uint8_t write_buffer[HTTP1_IO_BUF_SIZE]; }; ALLOC_DECLARE(qdr_http1_connection_t); @@ -207,12 +199,13 @@ ALLOC_DECLARE(qdr_http1_connection_t); //void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist); void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn); -void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist); -void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data); -uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo); -void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data); -// return the number of buffers currently held by the proactor for writing -int qdr_http1_out_data_buffers_outstanding(const qdr_http1_out_data_fifo_t *out_data); +void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets); +void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data); +uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_list_t *fifo); +void qdr_http1_out_data_cleanup(qdr_http1_out_data_list_t *out_data); +int qdr_http1_grant_read_buffers(qdr_http1_connection_t *hconn); +uintmax_t qdr_http1_get_read_buffers(qdr_http1_connection_t *hconn, + qd_buffer_list_t *blist); void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error); void qdr_http1_connection_free(qdr_http1_connection_t *hconn); diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index bcdb3e5..bb6d252 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -74,7 +74,8 @@ typedef struct _server_request_t { bool request_discard; // drop incoming request data bool headers_encoded; // True when header encode done - qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn + // fifo of encoded request data to be written out the raw connection: + qdr_http1_out_data_list_t out_data; _server_response_msg_list_t responses; // response(s) to this request @@ -476,24 +477,25 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn) { int error = 0; qd_buffer_list_t blist; - uintmax_t length; - qda_raw_conn_get_read_buffers(hconn->raw_conn, &blist, &length); - - if (HTTP1_DUMP_BUFFERS) { - fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length); - qd_buffer_t *bb = DEQ_HEAD(blist); - while (bb) { - fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]); - bb = DEQ_NEXT(bb); - } - fflush(stdout); - } + uintmax_t length = qdr_http1_get_read_buffers(hconn, &blist); if (length) { + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Read %"PRIuMAX" bytes from server (%zu buffers)", hconn->conn_id, hconn->in_link_id, length, DEQ_SIZE(blist)); + + if (HTTP1_DUMP_BUFFERS) { + fprintf(stdout, "\nServer raw buffer READ %"PRIuMAX" total octets\n", length); + qd_buffer_t *bb = DEQ_HEAD(blist); + while (bb) { + fprintf(stdout, " buffer='%.*s'\n", (int)qd_buffer_size(bb), (char*)&bb[1]); + bb = DEQ_NEXT(bb); + } + fflush(stdout); + } + hconn->in_http1_octets += length; error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); } @@ -506,7 +508,7 @@ static void _handle_conn_need_read_buffers(qdr_http1_connection_t *hconn) { // @TODO(kgiusti): backpressure if no credit // if (hconn->in_link_credit > 0 */) - int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); + int granted = qdr_http1_grant_read_buffers(hconn); qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", hconn->conn_id, granted); } @@ -549,7 +551,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests); if (_is_request_in_progress(hreq)) { hreq->request_discard = true; - qdr_http1_out_data_fifo_cleanup(&hreq->out_data); + qdr_http1_out_data_cleanup(&hreq->out_data); } pn_raw_connection_close(hconn->raw_conn); break; @@ -608,8 +610,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi break; } case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { - if (!hconn->q2_blocked) - _handle_conn_need_read_buffers(hconn); + _handle_conn_need_read_buffers(hconn); break; } case PN_RAW_CONNECTION_WAKE: { @@ -621,7 +622,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] server link unblocked from Q2 limit", hconn->conn_id); hconn->q2_blocked = false; error = _handle_conn_read_event(hconn); // restart receiving - _handle_conn_need_read_buffers(hconn); + if (!error) + // room for more incoming data + _handle_conn_need_read_buffers(hconn); } while (qdr_connection_process(hconn->qdr_conn)) {} @@ -680,11 +683,6 @@ static bool _process_request(_server_request_t *hreq) if (hreq->cancelled) { - // have to wait until all buffers returned from proton - // before we can release the request - if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data)) - return false; - // clean up the request message delivery if (hreq->request_dlv) { @@ -772,7 +770,7 @@ static bool _process_request(_server_request_t *hreq) } } - if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) { + if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data) == 0) { qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!", hconn->conn_id, hreq->base.msg_id); _server_request_free(hreq); @@ -804,7 +802,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server", hconn->conn_id, hconn->out_link_id, len); - qdr_http1_enqueue_buffer_list(&hreq->out_data, blist); + qdr_http1_enqueue_buffer_list(&hreq->out_data, blist, len); } } @@ -1150,8 +1148,7 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor, if (hconn->in_link_credit > 0) { - if (hconn->raw_conn && !hconn->q2_blocked) - qda_raw_conn_grant_read_buffers(hconn->raw_conn); + _handle_conn_need_read_buffers(hconn); // check for pending responses that are blocked for credit @@ -1293,7 +1290,7 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, hreq->base.response_addr = reply_to; hreq->base.site = group_id; hreq->base.start = qd_timer_now(); - DEQ_INIT(hreq->out_data.fifo); + DEQ_INIT(hreq->out_data); DEQ_INIT(hreq->responses); DEQ_INSERT_TAIL(hconn->requests, &hreq->base); @@ -1644,7 +1641,7 @@ static void _server_request_free(_server_request_t *hreq) { if (hreq) { qdr_http1_request_base_cleanup(&hreq->base); - qdr_http1_out_data_fifo_cleanup(&hreq->out_data); + qdr_http1_out_data_cleanup(&hreq->out_data); if (hreq->request_dlv) { qdr_delivery_set_context(hreq->request_dlv, 0); qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 server releasing request delivery"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org