This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 125b7d5275d80023930cfd7cc962b5dbc18a717b Author: Kenneth Giusti <[email protected]> AuthorDate: Sun Jan 3 16:29:45 2021 -0500 DISPATCH-1744: discard outgoing request on raw conn close --- src/adaptors/http1/http1_server.c | 64 +++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 1a661e6..c8adc62 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -70,6 +70,7 @@ typedef struct _server_request_t { uint64_t request_dispo; // set by adaptor during encode bool request_settled; // set by adaptor bool request_acked; // true if dispo sent to core + bool request_discard; // drop incoming request data bool headers_encoded; // True when header encode done qdr_http1_out_data_fifo_t out_data; // encoded request written to raw conn @@ -279,6 +280,14 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct) //////////////////////////////////////////////////////// +// Is the hreq currently in flight to the server? +// +static inline bool _is_request_in_progress(const _server_request_t *hreq) +{ + return hreq && (hreq->base.out_http1_octets > 0 || hreq->cancelled); +} + + // Create the qdr links and HTTP codec when the server connection comes up. // These links & codec will persist across temporary drops in the connection to // the server (like when closing the connection to indicate end of response @@ -487,15 +496,11 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi } case PN_RAW_CONNECTION_CLOSED_WRITE: { - // cancel the current request if the request has not been fully written - // to the raw connection + // discard any remaining outgoing request message data _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests); - if (hreq) { - if (hreq->base.out_http1_octets > 0) { // req msg written to server - if (!DEQ_IS_EMPTY(hreq->out_data.fifo)) { - _cancel_request(hreq); - } - } + if (_is_request_in_progress(hreq)) { + hreq->request_discard = true; + qdr_http1_out_data_fifo_cleanup(&hreq->out_data); } pn_raw_connection_close(hconn->raw_conn); break; @@ -504,6 +509,17 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id); pn_raw_connection_set_context(hconn->raw_conn, 0); + + // Check for a request that is in-progress - it needs to be cancelled. + // However there is an exception: the server has completed sending a + // response message and closed the connection, but the outgoing request + // message has not completed (example: a streaming POST that has been + // rejected by the server). In this case wait until the request message + // has fully arrived from the core. + + _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests); + if (_is_request_in_progress(hreq) && !hreq->response_complete) + _cancel_request(hreq); _process_requests(hconn); // @@ -757,12 +773,16 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; - qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, - "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server", - hconn->conn_id, hconn->out_link_id, len); - qdr_http1_enqueue_buffer_list(&hreq->out_data, blist); - if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) { - _write_pending_request(hreq); + if (hreq->request_discard) + qd_buffer_list_free_buffers(blist); + else { + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server", + hconn->conn_id, hconn->out_link_id, len); + qdr_http1_enqueue_buffer_list(&hreq->out_data, blist); + if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) { + _write_pending_request(hreq); + } } } @@ -774,12 +794,16 @@ static void _server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_ _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; - qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, - "[C%"PRIu64"][L%"PRIu64"] Sending body data to server", - hconn->conn_id, hconn->out_link_id); - qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data); - if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) { - _write_pending_request(hreq); + if (hreq->request_discard) + qd_message_stream_data_release(stream_data); + else { + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] Sending body data to server", + hconn->conn_id, hconn->out_link_id); + qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data); + if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) { + _write_pending_request(hreq); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
