This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new c6b2055 DISPATCH-1947: TCP Adaptor flow control
c6b2055 is described below
commit c6b205574ed5925cde40d8f126902d28ff5a32e2
Author: Chuck Rolke <[email protected]>
AuthorDate: Tue Feb 23 16:48:05 2021 -0500
DISPATCH-1947: TCP Adaptor flow control
This closes #1056
---
src/adaptors/tcp_adaptor.c | 88 ++++++++++++++++++++++++++++++++++++++++------
1 file changed, 78 insertions(+), 10 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 69bed1a..0b83123 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -17,6 +17,7 @@
* under the License.
*/
+#include "tcp_adaptor.h"
#include <proton/condition.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
@@ -26,7 +27,6 @@
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "delivery.h"
-#include "tcp_adaptor.h"
#include <stdio.h>
#include <inttypes.h>
@@ -78,6 +78,9 @@ struct qdr_tcp_connection_t {
int outgoing_buff_count; // number of buffers with
data
int outgoing_buff_idx; // first buffer with data
+ sys_atomic_t q2_restart; // signal to resume receive
+ bool q2_blocked; // stop reading from raw conn
+
DEQ_LINKS(qdr_tcp_connection_t);
};
@@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
}
}
-static int handle_incoming(qdr_tcp_connection_t *conn)
+
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link.
+// This routine must be thread safe: the thread on which it is running
+// is not an IO thread that owns the underlying pn_raw_conn.
+//
+void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+ qdr_tcp_connection_t *tc =
(qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context);
+ if (tc == 0) {
+ // bad news.
+ assert(false);
+ return;
+ }
+
+ // prevent the tc from being deleted while running:
+ sys_mutex_lock(tc->activation_lock);
+
+ if (tc && tc->pn_raw_conn) {
+ sys_atomic_set(&tc->q2_restart, 1);
+ pn_raw_connection_wake(tc->pn_raw_conn);
+ }
+
+ sys_mutex_unlock(tc->activation_lock);
+}
+
+
+// Fetch incoming raw incoming buffers from proton and pass them to
+// existing delivery or create a new delivery.
+// If close is pending then do not give more buffers to proton.
+static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending)
{
//
// Don't initiate an ingress stream message if we don't yet have a
reply-to address and credit.
@@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
return 0;
}
+ //
+ // Don't read from proton if in Q2 holdoff
+ //
+ if (conn->q2_blocked) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"]
handle_incoming q2_blocked", conn->conn_id);
+ return 0;
+ }
+
+ // Read all buffers available from proton.
+ // Collect buffers for ingress; free empty buffers.
qd_buffer_list_t buffers;
DEQ_INIT(buffers);
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
@@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
}
}
}
-
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read
buffers", conn->conn_id, DEQ_SIZE(buffers));
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read
buffers", conn->conn_id, free_count);
- grant_read_buffers(conn);
+
+ // Only grant more buffers to proton for reading if close is not pending
+ if (!close_pending) {
+ grant_read_buffers(conn);
+ }
if (conn->instream) {
- // @TODO(kgiusti): handle Q2 block event:
- qd_message_stream_data_append(qdr_delivery_message(conn->instream),
&buffers, 0);
+ qd_message_stream_data_append(qdr_delivery_message(conn->instream),
&buffers, &conn->q2_blocked);
+ if (conn->q2_blocked) {
+ // note: unit tests grep for this log!
+ qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client
link blocked on Q2 limit", conn->conn_id);
+ }
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id,
conn->incoming_id, count);
} else {
@@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
qd_message_compose_2(msg, props, false);
qd_compose_free(props);
+ // set up message q2 unblocked callback handler
+ qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn);
+ qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler,
conn_sp);
+
conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0,
0, 0);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id,
conn->incoming_id, count);
}
@@ -237,10 +290,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
}
+static int handle_incoming(qdr_tcp_connection_t *conn)
+{
+ // Normal incoming runs with no close pending
+ return handle_incoming_impl(conn, false);
+}
+
static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
{
// Flush buffers staged for writing to raw conn
- // and free possible references to stream data objects.
+ // and release any references to stream data objects.
if (conn->outgoing_buff_count > 0) {
for (size_t i = conn->outgoing_buff_idx;
i < conn->outgoing_buff_idx + conn->outgoing_buff_count;
@@ -263,10 +322,10 @@ static void flush_outgoing_buffs(qdr_tcp_connection_t
*conn)
static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
{
- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing
tcp_connection %p", tc->conn_id, (void*) tc);
free(tc->reply_to);
free(tc->remote_address);
free(tc->global_id);
+ sys_atomic_destroy(&tc->q2_restart);
if (tc->activate_timer) {
qd_timer_free(tc->activate_timer);
}
@@ -278,7 +337,6 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t*
tc)
static void handle_disconnected(qdr_tcp_connection_t* conn)
{
- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"]
handle_disconnected", conn->conn_id);
if (conn->instream) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id,
conn->incoming_id);
qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
@@ -552,8 +610,8 @@ static void handle_connection_event(pn_event_t *e,
qd_server_t *qd_server, void
switch (pn_event_type(e)) {
case PN_RAW_CONNECTION_CONNECTED: {
if (conn->ingress) {
- qdr_tcp_connection_ingress_accept(conn);
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED
Ingress accepted to %s from %s (global_id=%s)", conn->conn_id,
conn->config.host_port, conn->remote_address, conn->global_id);
+ qdr_tcp_connection_ingress_accept(conn);
break;
} else {
conn->remote_address = get_address_string(conn->pn_raw_conn);
@@ -569,6 +627,8 @@ static void handle_connection_event(pn_event_t *e,
qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_CLOSED_READ: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"]
PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+ conn->q2_blocked = false;
+ handle_incoming_impl(conn, true);
conn->raw_closed_read = true;
pn_raw_connection_close(conn->pn_raw_conn);
break;
@@ -601,6 +661,12 @@ static void handle_connection_event(pn_event_t *e,
qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_WAKE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE",
conn->conn_id);
+ if (sys_atomic_set(&conn->q2_restart, 0)) {
+ // note: unit tests grep for this log!
+ qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from
Q2 limit", conn->conn_id);
+ conn->q2_blocked = false;
+ handle_incoming(conn);
+ }
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
@@ -646,6 +712,7 @@ static qdr_tcp_connection_t
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
tc->context.handler = &handle_connection_event;
tc->config = listener->config;
tc->server = listener->server;
+ sys_atomic_init(&tc->q2_restart, 0);
tc->pn_raw_conn = pn_raw_connection();
pn_raw_connection_set_context(tc->pn_raw_conn, tc);
//the following call will cause a PN_RAW_CONNECTION_CONNECTED
@@ -739,6 +806,7 @@ static qdr_tcp_connection_t
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
tc->context.handler = &handle_connection_event;
tc->config = *config;
tc->server = server;
+ sys_atomic_init(&tc->q2_restart, 0);
tc->conn_id = qd_server_allocate_connection_id(tc->server);
//
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]