This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 7c5d17fb2af2575840c2200f7751f13a28bdce87 Author: Gordon Sim <g...@redhat.com> AuthorDate: Fri Apr 30 15:31:23 2021 +0100 DISPATCH-2142: a change to tcp adaptor buffering scheme --- include/qpid/dispatch/message.h | 10 ++ src/adaptors/tcp_adaptor.c | 272 +++++++++++++++++++--------------------- src/message.c | 14 +++ 3 files changed, 156 insertions(+), 140 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 5a66db3..3a6968c 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -383,6 +383,16 @@ size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *str void qd_message_stream_data_release(qd_message_stream_data_t *stream_data); +/** + * qd_message_stream_data_release_up_to + * + * Release this stream data and all the previous ones also. + * + * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data + */ +void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data); + + typedef enum { QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 1175e5f..1f8a87c 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -46,12 +46,12 @@ // arrives: // const uint32_t TCP_MAX_CAPACITY = 121635 * 6 * 2; +const size_t TCP_BUFFER_SIZE = 16384*2; ALLOC_DEFINE(qd_tcp_listener_t); ALLOC_DEFINE(qd_tcp_connector_t); -#define READ_BUFFERS 4 -#define WRITE_BUFFERS 4 +#define WRITE_BUFFERS 12 typedef struct qdr_tcp_connection_t qdr_tcp_connection_t; @@ -91,10 +91,16 @@ struct qdr_tcp_connection_t { uint64_t last_in_time; uint64_t last_out_time; - qd_message_stream_data_t *outgoing_stream_data; // current segment + qd_message_stream_data_t *previous_stream_data; // previous segment (received in full) + qd_message_stream_data_t *outgoing_stream_data; // current segment size_t outgoing_body_bytes; // bytes received from current segment int outgoing_body_offset; // buffer offset into current segment + pn_raw_buffer_t read_buffer; + bool read_pending; + pn_raw_buffer_t write_buffer; + bool write_pending; + pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS]; int outgoing_buff_count; // number of buffers with data int outgoing_buff_idx; // first buffer with data @@ -127,6 +133,24 @@ static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc); +static void allocate_tcp_buffer(pn_raw_buffer_t *buffer) +{ + buffer->bytes = malloc(TCP_BUFFER_SIZE); + ZERO(buffer->bytes); + buffer->capacity = TCP_BUFFER_SIZE; + buffer->size = 0; + buffer->offset = 0; +} + +static void allocate_tcp_write_buffer(pn_raw_buffer_t *buffer) +{ + buffer->bytes = malloc(TCP_BUFFER_SIZE); + ZERO(buffer->bytes); + buffer->capacity = TCP_BUFFER_SIZE; + buffer->size = 0; + buffer->offset = 0; +} + static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) { assert(conn); @@ -163,28 +187,14 @@ static void on_activate(void *context) static void grant_read_buffers(qdr_tcp_connection_t *conn) { - if (conn->raw_closed_read) + if (conn->raw_closed_read || conn->read_pending) return; - pn_raw_buffer_t raw_buffers[READ_BUFFERS]; - // Give proactor more read buffers for the socket - size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); + conn->read_pending = true; qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] Granting %zu to pn_raw_connection_give_read_buffers()", - conn->conn_id, conn->incoming_id, desired); - while (desired) { - size_t i; - for (i = 0; i < desired && i < READ_BUFFERS; ++i) { - qd_buffer_t *buf = qd_buffer(); - raw_buffers[i].bytes = (char*) qd_buffer_base(buf); - raw_buffers[i].capacity = qd_buffer_capacity(buf); - raw_buffers[i].size = 0; - raw_buffers[i].offset = 0; - raw_buffers[i].context = (uintptr_t) buf; - } - desired -= i; - pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); - } + "[C%"PRIu64"][L%"PRIu64"] Calling pn_raw_connection_give_read_buffers() capacity=%i offset=%i", + conn->conn_id, conn->incoming_id, conn->read_buffer.capacity, conn->read_buffer.offset); + pn_raw_connection_give_read_buffers(conn->pn_raw_conn, &conn->read_buffer, 1); } @@ -216,59 +226,39 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) sys_mutex_unlock(tc->activation_lock); } - // Extract buffers and their bytes from raw connection. -// * Proton decides how many buffers are to be taken. -// * Buffers with no data are freed. -// * Buffers with data are appended to caller's buffers list. // * Add received byte count to connection stats // * Return the count of bytes in the buffers list static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t *buffers) { - pn_raw_buffer_t raw_buffers[READ_BUFFERS]; - - size_t n; - int count = 0; - int free_count = 0; - const bool was_open = conn->bytes_unacked < TCP_MAX_CAPACITY; - - while ((conn->raw_closed_write || count + conn->bytes_unacked < TCP_MAX_CAPACITY) - && (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) { - - for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) { - qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context; - qd_buffer_insert(buf, raw_buffers[i].size); - count += raw_buffers[i].size; - - assert(raw_buffers[i].size == qd_buffer_size(buf)); - if (raw_buffers[i].size > 0) { - DEQ_INSERT_TAIL(*buffers, buf); - } else { - qd_buffer_free(buf); - free_count++; - } - } + pn_raw_buffer_t raw_buffer; + if ( conn->bytes_unacked >= TCP_MAX_CAPACITY || !pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) { + return 0; } + int result = raw_buffer.size; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took buffer with %zu bytes", + conn->conn_id, result); - if (count > 0) { + if (buffers) { + qd_buffer_list_append(buffers, (uint8_t*) (raw_buffer.bytes + raw_buffer.offset), raw_buffer.size); + } + //reset buffer for further reads + conn->read_buffer.size = 0; + conn->read_buffer.offset = 0; + conn->read_pending = false; + if (result > 0) { // account for any incoming bytes just read conn->last_in_time = tcp_adaptor->core->uptime_ticks; - conn->bytes_in += count; - conn->bytes_unacked += count; + conn->bytes_in += result; + conn->bytes_unacked += result; if (conn->bytes_unacked >= TCP_MAX_CAPACITY) { - if (was_open) { - qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, - "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" unacked=%"PRIu64, - conn->conn_id, conn->bytes_in, conn->bytes_unacked); - } + qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, + "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" unacked=%"PRIu64, + conn->conn_id, conn->bytes_in, conn->bytes_unacked); } } - - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took %zu, freed %i", - conn->conn_id, DEQ_SIZE(*buffers), free_count); - - return count; + return result; } @@ -292,10 +282,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. drain read buffers", conn->conn_id, conn->incoming_id, msg, qdr_tcp_connection_role_name(conn)); - qd_buffer_list_t buffers; - DEQ_INIT(buffers); - handle_incoming_raw_read(conn, &buffers); - qd_buffer_list_free_buffers(&buffers); + handle_incoming_raw_read(conn, 0); return 0; } @@ -422,24 +409,13 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) { - // Flush buffers staged for writing to raw conn - // and release any references to stream data objects. - if (conn->outgoing_buff_count > 0) { - for (size_t i = conn->outgoing_buff_idx; - i < conn->outgoing_buff_idx + conn->outgoing_buff_count; - ++i) { - if (conn->outgoing_buffs[i].context) { - qd_message_stream_data_release( - (qd_message_stream_data_t*)conn->outgoing_buffs[i].context); - } - } - } - conn->outgoing_buff_count = 0; - - // Flush in-progress stream data object + // Free any remaining stream data objects if (conn->outgoing_stream_data) { - qd_message_stream_data_release(conn->outgoing_stream_data); + qd_message_stream_data_release_up_to(conn->outgoing_stream_data); conn->outgoing_stream_data = 0; + } else if (conn->previous_stream_data) { + qd_message_stream_data_release_up_to(conn->previous_stream_data); + conn->previous_stream_data = 0; } } @@ -500,6 +476,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) qdr_connection_closed(conn->qdr_conn); conn->qdr_conn = 0; } + free(conn->write_buffer.bytes); + free(conn->read_buffer.bytes); //need to free on core thread to avoid deleting while in use by management agent qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, "delete_tcp_connection"); @@ -554,13 +532,9 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r assert(conn->outgoing_body_bytes <= conn->outgoing_stream_data->payload.length); if (conn->outgoing_body_bytes == conn->outgoing_stream_data->payload.length) { - // This buffer set consumes the remainder of the stream_data segment. - // Attach the stream_data struct to the last buffer so that the struct - // can be freed after the buffer has been transmitted by raw connection out. - buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data; - // Erase the stream_data struct from the connection so that // a new one gets created on the next pass. + conn->previous_stream_data = conn->outgoing_stream_data; conn->outgoing_stream_data = 0; } else { // Returned buffer set did not consume the entire stream_data segment. @@ -576,35 +550,36 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r } -static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) +static bool copy_outgoing_buffs(qdr_tcp_connection_t *conn) { // Send the outgoing buffs to pn_raw_conn. // Return true if all the buffers went out. bool result; if (conn->outgoing_buff_count == 0) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] No outgoing buffers to copy at present", conn->conn_id); result = true; + } else if (conn->write_pending) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Can't write, previous write still pending", conn->conn_id); + result = false; } else { - size_t used = pn_raw_connection_write_buffers(conn->pn_raw_conn, - &conn->outgoing_buffs[conn->outgoing_buff_idx], - conn->outgoing_buff_count); - result = used == conn->outgoing_buff_count; - - int bytes_written = 0; - for (size_t i = 0; i < used; i++) { - if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) { - bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size; - } else { - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, - "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", - conn->conn_id, i+1, used); - } + //copy small buffers into large one + size_t used = conn->outgoing_buff_idx; + while (used < conn->outgoing_buff_count && ((conn->write_buffer.size + conn->outgoing_buffs[used].size) < conn->write_buffer.capacity)) { + memcpy(conn->write_buffer.bytes + conn->write_buffer.size, conn->outgoing_buffs[used].bytes, conn->outgoing_buffs[used].size); + conn->write_buffer.size += conn->outgoing_buffs[used].size; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] Copying buffer %i of %i with %i bytes (total=%i)", conn->conn_id, used+1, conn->outgoing_buff_count - conn->outgoing_buff_idx, conn->outgoing_buffs[used].size, conn->write_buffer.size); + used++; } - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i bytes", conn->conn_id, bytes_written); + conn->write_buffer.context = (uintptr_t) conn->previous_stream_data; + conn->previous_stream_data = 0; + + result = used == conn->outgoing_buff_count; - conn->outgoing_buff_count -= used; conn->outgoing_buff_idx += used; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] Copied %i buffers, %i remain", conn->conn_id, used, conn->outgoing_buff_count - conn->outgoing_buff_idx); } return result; } @@ -623,7 +598,7 @@ static void handle_outgoing(qdr_tcp_connection_t *conn) if (conn->outgoing_buff_count > 0) { // flush outgoing buffs that hold body data waiting to go out - read_more_body = write_outgoing_buffs(conn); + read_more_body = copy_outgoing_buffs(conn); } while (read_more_body) { ZERO(conn->outgoing_buffs); @@ -632,13 +607,25 @@ static void handle_outgoing(qdr_tcp_connection_t *conn) if (conn->outgoing_buff_count > 0) { // Send the data just returned - read_more_body = write_outgoing_buffs(conn); + read_more_body = copy_outgoing_buffs(conn); } else { // The incoming stream has no new data to send break; } } + if (conn->write_buffer.size && !conn->write_pending) { + if (pn_raw_connection_write_buffers(conn->pn_raw_conn, &conn->write_buffer, 1)) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i bytes", conn->conn_id, conn->write_buffer.size); + + conn->write_pending = true; + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] pn_raw_connection_write_buffers could not write %i bytes", conn->conn_id, conn->write_buffer.size); + } + } + if (conn->read_eos_seen) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outgoing calling pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s", @@ -678,6 +665,8 @@ static char *get_address_string(pn_raw_connection_t *socket) static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) { + allocate_tcp_write_buffer(&tc->write_buffer); + allocate_tcp_buffer(&tc->read_buffer); tc->remote_address = get_address_string(tc->pn_raw_conn); tc->global_id = get_global_id(tc->config.site_id, tc->remote_address); qdr_connection_info_t *info = qdr_connection_info(false, // is_encrypted, @@ -856,40 +845,41 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } case PN_RAW_CONNECTION_WRITTEN: { - pn_raw_buffer_t buffs[WRITE_BUFFERS]; - size_t n; - size_t written = 0; - while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) { - for (size_t i = 0; i < n; ++i) { - written += buffs[i].size; - if (buffs[i].context) { - qd_message_stream_data_release((qd_message_stream_data_t*) buffs[i].context); - } + pn_raw_buffer_t buff; + if ( pn_raw_connection_take_written_buffers(conn->pn_raw_conn, &buff, 1) ) { + size_t written = buff.size; + if (buff.context) { + qd_message_stream_data_release_up_to((qd_message_stream_data_t*) buff.context); + } + conn->write_pending = false; + conn->write_buffer.size = 0; + conn->write_buffer.offset = 0; + conn->write_buffer.context = 0; + conn->last_out_time = tcp_adaptor->core->uptime_ticks; + conn->bytes_out += written; + + if (written > 0) { + // Tell the upstream to open its receive window. Note: this update + // is sent to the upstream (ingress) TCP adaptor. Since this update + // is internal to the router network (never sent to the client) we + // do not need to use the section_number (no section numbers in a + // TCP stream!) and use section_offset only. + // + qd_delivery_state_t *dstate = qd_delivery_state(); + dstate->section_number = 0; + dstate->section_offset = conn->bytes_out; + qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->outstream, + PN_RECEIVED, + false, // settled + dstate, + false); } - } - conn->last_out_time = tcp_adaptor->core->uptime_ticks; - conn->bytes_out += written; - - if (written > 0) { - // Tell the upstream to open its receive window. Note: this update - // is sent to the upstream (ingress) TCP adaptor. Since this update - // is internal to the router network (never sent to the client) we - // do not need to use the section_number (no section numbers in a - // TCP stream!) and use section_offset only. - // - qd_delivery_state_t *dstate = qd_delivery_state(); - dstate->section_number = 0; - dstate->section_offset = conn->bytes_out; - qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->outstream, - PN_RECEIVED, - false, // settled - dstate, - false); - } - qd_log(log, QD_LOG_DEBUG, - "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" bytes", - conn->conn_id, qdr_tcp_connection_role_name(conn), written, conn->bytes_out); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" bytes", + conn->conn_id, qdr_tcp_connection_role_name(conn), written, conn->bytes_out); + handle_outgoing(conn); + } while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -1024,6 +1014,8 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi if (tc->egress_dispatcher) qdr_tcp_open_server_side_connection(tc); else { + allocate_tcp_write_buffer(&tc->write_buffer); + allocate_tcp_buffer(&tc->read_buffer); qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] call pn_proactor_raw_connect(). Egress connecting to: %s", tc->conn_id, tc->config.host_port); diff --git a/src/message.c b/src/message.c index 4198599..f345da3 100644 --- a/src/message.c +++ b/src/message.c @@ -2567,6 +2567,20 @@ int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw return idx; } +void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data) +{ + if (!stream_data) + return; + + qd_message_pvt_t *msg = stream_data->owning_message; + qd_message_stream_data_t *next = DEQ_HEAD(msg->stream_data_list); + qd_message_stream_data_t *current = NULL; + while (next && current != stream_data) { + current = next; + next = DEQ_NEXT(next); + qd_message_stream_data_release(current); + } +} /** * qd_message_stream_data_release --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org