This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 7c5d17fb2af2575840c2200f7751f13a28bdce87
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Fri Apr 30 15:31:23 2021 +0100

    DISPATCH-2142: a change to tcp adaptor buffering scheme
---
 include/qpid/dispatch/message.h |  10 ++
 src/adaptors/tcp_adaptor.c      | 272 +++++++++++++++++++---------------------
 src/message.c                   |  14 +++
 3 files changed, 156 insertions(+), 140 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 5a66db3..3a6968c 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -383,6 +383,16 @@ size_t qd_message_stream_data_payload_length(const 
qd_message_stream_data_t *str
 void qd_message_stream_data_release(qd_message_stream_data_t *stream_data);
 
 
+/**
+ * qd_message_stream_data_release_up_to
+ *
+ * Release this stream data and all the previous ones also.
+ *
+ * @param stream_data Pointer to a body data object returned by 
qd_message_next_stream_data
+ */
+void qd_message_stream_data_release_up_to(qd_message_stream_data_t 
*stream_data);
+
+
 typedef enum {
     QD_MESSAGE_STREAM_DATA_BODY_OK,      // A valid body data object has been 
returned
     QD_MESSAGE_STREAM_DATA_FOOTER_OK,    // A valid footer has been returned
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 1175e5f..1f8a87c 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -46,12 +46,12 @@
 //   arrives:
 //
 const uint32_t TCP_MAX_CAPACITY = 121635 * 6 * 2;
+const size_t TCP_BUFFER_SIZE = 16384*2;
 
 ALLOC_DEFINE(qd_tcp_listener_t);
 ALLOC_DEFINE(qd_tcp_connector_t);
 
-#define READ_BUFFERS 4
-#define WRITE_BUFFERS 4
+#define WRITE_BUFFERS 12
 
 typedef struct qdr_tcp_connection_t qdr_tcp_connection_t;
 
@@ -91,10 +91,16 @@ struct qdr_tcp_connection_t {
     uint64_t              last_in_time;
     uint64_t              last_out_time;
 
-    qd_message_stream_data_t *outgoing_stream_data;   // current segment
+    qd_message_stream_data_t *previous_stream_data; // previous segment 
(received in full)
+    qd_message_stream_data_t *outgoing_stream_data; // current segment
     size_t                  outgoing_body_bytes;  // bytes received from 
current segment
     int                     outgoing_body_offset; // buffer offset into 
current segment
 
+    pn_raw_buffer_t         read_buffer;
+    bool                    read_pending;
+    pn_raw_buffer_t         write_buffer;
+    bool                    write_pending;
+
     pn_raw_buffer_t         outgoing_buffs[WRITE_BUFFERS];
     int                     outgoing_buff_count;  // number of buffers with 
data
     int                     outgoing_buff_idx;    // first buffer with data
@@ -127,6 +133,24 @@ static void handle_disconnected(qdr_tcp_connection_t* 
conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
 
+static void allocate_tcp_buffer(pn_raw_buffer_t *buffer)
+{
+    buffer->bytes = malloc(TCP_BUFFER_SIZE);
+    ZERO(buffer->bytes);
+    buffer->capacity = TCP_BUFFER_SIZE;
+    buffer->size = 0;
+    buffer->offset = 0;
+}
+
+static void allocate_tcp_write_buffer(pn_raw_buffer_t *buffer)
+{
+    buffer->bytes = malloc(TCP_BUFFER_SIZE);
+    ZERO(buffer->bytes);
+    buffer->capacity = TCP_BUFFER_SIZE;
+    buffer->size = 0;
+    buffer->offset = 0;
+}
+
 static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn)
 {
     assert(conn);
@@ -163,28 +187,14 @@ static void on_activate(void *context)
 
 static void grant_read_buffers(qdr_tcp_connection_t *conn)
 {
-    if (conn->raw_closed_read)
+    if (conn->raw_closed_read || conn->read_pending)
         return;
 
-    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
-    // Give proactor more read buffers for the socket
-    size_t desired = 
pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+    conn->read_pending = true;
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-        "[C%"PRIu64"][L%"PRIu64"] Granting %zu to 
pn_raw_connection_give_read_buffers()",
-        conn->conn_id, conn->incoming_id, desired);
-    while (desired) {
-        size_t i;
-        for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
-            qd_buffer_t *buf = qd_buffer();
-            raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
-            raw_buffers[i].capacity = qd_buffer_capacity(buf);
-            raw_buffers[i].size = 0;
-            raw_buffers[i].offset = 0;
-            raw_buffers[i].context = (uintptr_t) buf;
-        }
-        desired -= i;
-        pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
-    }
+        "[C%"PRIu64"][L%"PRIu64"] Calling 
pn_raw_connection_give_read_buffers() capacity=%i offset=%i",
+           conn->conn_id, conn->incoming_id, conn->read_buffer.capacity, 
conn->read_buffer.offset);
+    pn_raw_connection_give_read_buffers(conn->pn_raw_conn, &conn->read_buffer, 
1);
 }
 
 
@@ -216,59 +226,39 @@ void qdr_tcp_q2_unblocked_handler(const 
qd_alloc_safe_ptr_t context)
     sys_mutex_unlock(tc->activation_lock);
 }
 
-
 // Extract buffers and their bytes from raw connection.
-// * Proton decides how many buffers are to be taken.
-// * Buffers with no data are freed.
-// * Buffers with data are appended to caller's buffers list.
 // * Add received byte count to connection stats
 // * Return the count of bytes in the buffers list
 static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, 
qd_buffer_list_t *buffers)
 {
-    pn_raw_buffer_t raw_buffers[READ_BUFFERS];
-
-    size_t n;
-    int count = 0;
-    int free_count = 0;
-    const bool was_open = conn->bytes_unacked < TCP_MAX_CAPACITY;
-
-    while ((conn->raw_closed_write || count + conn->bytes_unacked < 
TCP_MAX_CAPACITY)
-           && (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, 
raw_buffers, READ_BUFFERS)) ) {
-
-        for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
-            qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
-            qd_buffer_insert(buf, raw_buffers[i].size);
-            count += raw_buffers[i].size;
-
-            assert(raw_buffers[i].size == qd_buffer_size(buf));
-            if (raw_buffers[i].size > 0) {
-                DEQ_INSERT_TAIL(*buffers, buf);
-            } else {
-                qd_buffer_free(buf);
-                free_count++;
-            }
-        }
+    pn_raw_buffer_t raw_buffer;
+    if ( conn->bytes_unacked >= TCP_MAX_CAPACITY || 
!pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) {
+        return 0;
     }
+    int result = raw_buffer.size;
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+        "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took buffer with 
%zu bytes",
+        conn->conn_id, result);
 
-    if (count > 0) {
+    if (buffers) {
+        qd_buffer_list_append(buffers, (uint8_t*) (raw_buffer.bytes + 
raw_buffer.offset), raw_buffer.size);
+    }
+    //reset buffer for further reads
+    conn->read_buffer.size = 0;
+    conn->read_buffer.offset = 0;
+    conn->read_pending = false;
+    if (result > 0) {
         // account for any incoming bytes just read
         conn->last_in_time = tcp_adaptor->core->uptime_ticks;
-        conn->bytes_in      += count;
-        conn->bytes_unacked += count;
+        conn->bytes_in      += result;
+        conn->bytes_unacked += result;
         if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
-            if (was_open) {
-                qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
-                       "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" 
unacked=%"PRIu64,
-                       conn->conn_id, conn->bytes_in, conn->bytes_unacked);
-            }
+            qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
+                   "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" 
unacked=%"PRIu64,
+                   conn->conn_id, conn->bytes_in, conn->bytes_unacked);
         }
     }
-
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-        "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took %zu, freed 
%i",
-        conn->conn_id, DEQ_SIZE(*buffers), free_count);
-
-    return count;
+    return result;
 }
 
 
@@ -292,10 +282,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, 
const char *msg)
             "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. 
drain read buffers",
             conn->conn_id, conn->incoming_id, msg,
             qdr_tcp_connection_role_name(conn));
-        qd_buffer_list_t buffers;
-        DEQ_INIT(buffers);
-        handle_incoming_raw_read(conn, &buffers);
-        qd_buffer_list_free_buffers(&buffers);
+        handle_incoming_raw_read(conn, 0);
         return 0;
     }
 
@@ -422,24 +409,13 @@ static int handle_incoming(qdr_tcp_connection_t *conn, 
const char *msg)
 
 static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
 {
-    // Flush buffers staged for writing to raw conn
-    // and release any references to stream data objects.
-    if (conn->outgoing_buff_count > 0) {
-        for (size_t i = conn->outgoing_buff_idx;
-            i < conn->outgoing_buff_idx + conn->outgoing_buff_count;
-            ++i) {
-            if (conn->outgoing_buffs[i].context) {
-                qd_message_stream_data_release(
-                    
(qd_message_stream_data_t*)conn->outgoing_buffs[i].context);
-            }
-        }
-    }
-    conn->outgoing_buff_count = 0;
-
-    // Flush in-progress stream data object
+    // Free any remaining stream data objects
     if (conn->outgoing_stream_data) {
-        qd_message_stream_data_release(conn->outgoing_stream_data);
+        qd_message_stream_data_release_up_to(conn->outgoing_stream_data);
         conn->outgoing_stream_data = 0;
+    } else if (conn->previous_stream_data) {
+        qd_message_stream_data_release_up_to(conn->previous_stream_data);
+        conn->previous_stream_data = 0;
     }
 }
 
@@ -500,6 +476,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
         qdr_connection_closed(conn->qdr_conn);
         conn->qdr_conn = 0;
     }
+    free(conn->write_buffer.bytes);
+    free(conn->read_buffer.bytes);
 
     //need to free on core thread to avoid deleting while in use by management 
agent
     qdr_action_t *action = qdr_action(qdr_del_tcp_connection_CT, 
"delete_tcp_connection");
@@ -554,13 +532,9 @@ static int read_message_body(qdr_tcp_connection_t *conn, 
qd_message_t *msg, pn_r
         assert(conn->outgoing_body_bytes <= 
conn->outgoing_stream_data->payload.length);
 
         if (conn->outgoing_body_bytes == 
conn->outgoing_stream_data->payload.length) {
-            // This buffer set consumes the remainder of the stream_data 
segment.
-            // Attach the stream_data struct to the last buffer so that the 
struct
-            // can be freed after the buffer has been transmitted by raw 
connection out.
-            buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data;
-
             // Erase the stream_data struct from the connection so that
             // a new one gets created on the next pass.
+            conn->previous_stream_data = conn->outgoing_stream_data;
             conn->outgoing_stream_data = 0;
         } else {
             // Returned buffer set did not consume the entire stream_data 
segment.
@@ -576,35 +550,36 @@ static int read_message_body(qdr_tcp_connection_t *conn, 
qd_message_t *msg, pn_r
 }
 
 
-static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
+static bool copy_outgoing_buffs(qdr_tcp_connection_t *conn)
 {
     // Send the outgoing buffs to pn_raw_conn.
     // Return true if all the buffers went out.
     bool result;
 
     if (conn->outgoing_buff_count == 0) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] No 
outgoing buffers to copy at present", conn->conn_id);
         result = true;
+    } else if (conn->write_pending) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Can't 
write, previous write still pending", conn->conn_id);
+        result = false;
     } else {
-        size_t used = pn_raw_connection_write_buffers(conn->pn_raw_conn,
-                                                      
&conn->outgoing_buffs[conn->outgoing_buff_idx],
-                                                      
conn->outgoing_buff_count);
-        result = used == conn->outgoing_buff_count;
-
-        int bytes_written = 0;
-        for (size_t i = 0; i < used; i++) {
-            if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) {
-                bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx 
+ i].size;
-            } else {
-                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
-                       "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" 
of %"PRIu64")",
-                       conn->conn_id, i+1, used);
-            }
+        //copy small buffers into large one
+        size_t used = conn->outgoing_buff_idx;
+        while (used < conn->outgoing_buff_count && ((conn->write_buffer.size + 
conn->outgoing_buffs[used].size) < conn->write_buffer.capacity)) {
+            memcpy(conn->write_buffer.bytes + conn->write_buffer.size, 
conn->outgoing_buffs[used].bytes, conn->outgoing_buffs[used].size);
+            conn->write_buffer.size += conn->outgoing_buffs[used].size;
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] Copying buffer %i of %i with %i bytes 
(total=%i)", conn->conn_id, used+1, conn->outgoing_buff_count - 
conn->outgoing_buff_idx, conn->outgoing_buffs[used].size, 
conn->write_buffer.size);
+            used++;
         }
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i bytes", 
conn->conn_id, bytes_written);
+        conn->write_buffer.context = (uintptr_t) conn->previous_stream_data;
+        conn->previous_stream_data = 0;
+
+        result = used == conn->outgoing_buff_count;
 
-        conn->outgoing_buff_count -= used;
         conn->outgoing_buff_idx   += used;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Copied %i buffers, %i remain", conn->conn_id, 
used, conn->outgoing_buff_count - conn->outgoing_buff_idx);
     }
     return result;
 }
@@ -623,7 +598,7 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
 
         if (conn->outgoing_buff_count > 0) {
             // flush outgoing buffs that hold body data waiting to go out
-            read_more_body = write_outgoing_buffs(conn);
+            read_more_body = copy_outgoing_buffs(conn);
         }
         while (read_more_body) {
             ZERO(conn->outgoing_buffs);
@@ -632,13 +607,25 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
 
             if (conn->outgoing_buff_count > 0) {
                 // Send the data just returned
-                read_more_body = write_outgoing_buffs(conn);
+                read_more_body = copy_outgoing_buffs(conn);
             } else {
                 // The incoming stream has no new data to send
                 break;
             }
         }
 
+        if (conn->write_buffer.size && !conn->write_pending) {
+            if (pn_raw_connection_write_buffers(conn->pn_raw_conn, 
&conn->write_buffer, 1)) {
+                qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                       "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i 
bytes", conn->conn_id, conn->write_buffer.size);
+
+                conn->write_pending = true;
+            } else {
+                qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                       "[C%"PRIu64"] pn_raw_connection_write_buffers could not 
write %i bytes", conn->conn_id, conn->write_buffer.size);
+            }
+        }
+
         if (conn->read_eos_seen) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] handle_outgoing calling 
pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s",
@@ -678,6 +665,8 @@ static char *get_address_string(pn_raw_connection_t *socket)
 
 static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
 {
+    allocate_tcp_write_buffer(&tc->write_buffer);
+    allocate_tcp_buffer(&tc->read_buffer);
     tc->remote_address = get_address_string(tc->pn_raw_conn);
     tc->global_id = get_global_id(tc->config.site_id, tc->remote_address);
     qdr_connection_info_t *info = qdr_connection_info(false,               // 
is_encrypted,
@@ -856,40 +845,41 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_WRITTEN: {
-        pn_raw_buffer_t buffs[WRITE_BUFFERS];
-        size_t n;
-        size_t written = 0;
-        while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, 
buffs, WRITE_BUFFERS)) ) {
-            for (size_t i = 0; i < n; ++i) {
-                written += buffs[i].size;
-                if (buffs[i].context) {
-                    qd_message_stream_data_release((qd_message_stream_data_t*) 
buffs[i].context);
-                }
+        pn_raw_buffer_t buff;
+        if ( pn_raw_connection_take_written_buffers(conn->pn_raw_conn, &buff, 
1) ) {
+            size_t written = buff.size;
+            if (buff.context) {
+                
qd_message_stream_data_release_up_to((qd_message_stream_data_t*) buff.context);
+            }
+            conn->write_pending = false;
+            conn->write_buffer.size = 0;
+            conn->write_buffer.offset = 0;
+            conn->write_buffer.context = 0;
+            conn->last_out_time = tcp_adaptor->core->uptime_ticks;
+            conn->bytes_out += written;
+
+            if (written > 0) {
+                // Tell the upstream to open its receive window.  Note: this 
update
+                // is sent to the upstream (ingress) TCP adaptor. Since this 
update
+                // is internal to the router network (never sent to the 
client) we
+                // do not need to use the section_number (no section numbers 
in a
+                // TCP stream!) and use section_offset only.
+                //
+                qd_delivery_state_t *dstate = qd_delivery_state();
+                dstate->section_number = 0;
+                dstate->section_offset = conn->bytes_out;
+                qdr_delivery_remote_state_updated(tcp_adaptor->core, 
conn->outstream,
+                                                  PN_RECEIVED,
+                                                  false,  // settled
+                                                  dstate,
+                                                  false);
             }
-        }
-        conn->last_out_time = tcp_adaptor->core->uptime_ticks;
-        conn->bytes_out += written;
-
-        if (written > 0) {
-            // Tell the upstream to open its receive window.  Note: this update
-            // is sent to the upstream (ingress) TCP adaptor. Since this update
-            // is internal to the router network (never sent to the client) we
-            // do not need to use the section_number (no section numbers in a
-            // TCP stream!) and use section_offset only.
-            //
-            qd_delivery_state_t *dstate = qd_delivery_state();
-            dstate->section_number = 0;
-            dstate->section_offset = conn->bytes_out;
-            qdr_delivery_remote_state_updated(tcp_adaptor->core, 
conn->outstream,
-                                              PN_RECEIVED,
-                                              false,  // settled
-                                              dstate,
-                                              false);
-        }
 
-        qd_log(log, QD_LOG_DEBUG,
-               "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s 
pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" 
bytes",
-               conn->conn_id, qdr_tcp_connection_role_name(conn), written, 
conn->bytes_out);
+            qd_log(log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN %s 
pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" 
bytes",
+                   conn->conn_id, qdr_tcp_connection_role_name(conn), written, 
conn->bytes_out);
+            handle_outgoing(conn);
+        }
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -1024,6 +1014,8 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
     if (tc->egress_dispatcher)
         qdr_tcp_open_server_side_connection(tc);
     else {
+        allocate_tcp_write_buffer(&tc->write_buffer);
+        allocate_tcp_buffer(&tc->read_buffer);
         qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
                "[C%"PRIu64"] call pn_proactor_raw_connect(). Egress connecting 
to: %s",
                tc->conn_id, tc->config.host_port);
diff --git a/src/message.c b/src/message.c
index 4198599..f345da3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2567,6 +2567,20 @@ int 
qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw
     return idx;
 }
 
+void qd_message_stream_data_release_up_to(qd_message_stream_data_t 
*stream_data)
+{
+    if (!stream_data)
+        return;
+
+    qd_message_pvt_t         *msg     = stream_data->owning_message;
+    qd_message_stream_data_t *next    = DEQ_HEAD(msg->stream_data_list);
+    qd_message_stream_data_t *current = NULL;
+    while (next && current != stream_data) {
+        current = next;
+        next = DEQ_NEXT(next);
+        qd_message_stream_data_release(current);
+    }
+}
 
 /**
  * qd_message_stream_data_release

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to