This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 125b7d5275d80023930cfd7cc962b5dbc18a717b
Author: Kenneth Giusti <[email protected]>
AuthorDate: Sun Jan 3 16:29:45 2021 -0500

    DISPATCH-1744: discard outgoing request on raw conn close
---
 src/adaptors/http1/http1_server.c | 64 +++++++++++++++++++++++++++------------
 1 file changed, 44 insertions(+), 20 deletions(-)

diff --git a/src/adaptors/http1/http1_server.c 
b/src/adaptors/http1/http1_server.c
index 1a661e6..c8adc62 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -70,6 +70,7 @@ typedef struct _server_request_t {
     uint64_t        request_dispo;   // set by adaptor during encode
     bool            request_settled; // set by adaptor
     bool            request_acked;   // true if dispo sent to core
+    bool            request_discard; // drop incoming request data
     bool            headers_encoded; // True when header encode done
 
     qdr_http1_out_data_fifo_t out_data;  // encoded request written to raw conn
@@ -279,6 +280,14 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, 
qd_http_connector_t *ct)
 ////////////////////////////////////////////////////////
 
 
+// Is the hreq currently in flight to the server?
+//
+static inline bool _is_request_in_progress(const _server_request_t *hreq)
+{
+    return hreq && (hreq->base.out_http1_octets > 0 || hreq->cancelled);
+}
+
+
 // Create the qdr links and HTTP codec when the server connection comes up.
 // These links & codec will persist across temporary drops in the connection to
 // the server (like when closing the connection to indicate end of response
@@ -487,15 +496,11 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
     }
 
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        // cancel the current request if the request has not been fully written
-        // to the raw connection
+        // discard any remaining outgoing request message data
         _server_request_t *hreq = (_server_request_t*) 
DEQ_HEAD(hconn->requests);
-        if (hreq) {
-            if (hreq->base.out_http1_octets > 0) {  // req msg written to 
server
-                if (!DEQ_IS_EMPTY(hreq->out_data.fifo)) {
-                    _cancel_request(hreq);
-                }
-            }
+        if (_is_request_in_progress(hreq)) {
+            hreq->request_discard = true;
+            qdr_http1_out_data_fifo_cleanup(&hreq->out_data);
         }
         pn_raw_connection_close(hconn->raw_conn);
         break;
@@ -504,6 +509,17 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", 
hconn->conn_id);
 
         pn_raw_connection_set_context(hconn->raw_conn, 0);
+
+        // Check for a request that is in-progress - it needs to be cancelled.
+        // However there is an exception: the server has completed sending a
+        // response message and closed the connection, but the outgoing request
+        // message has not completed (example: a streaming POST that has been
+        // rejected by the server). In this case wait until the request message
+        // has fully arrived from the core.
+
+        _server_request_t *hreq = (_server_request_t*) 
DEQ_HEAD(hconn->requests);
+        if (_is_request_in_progress(hreq) && !hreq->response_complete)
+            _cancel_request(hreq);
         _process_requests(hconn);
 
         //
@@ -757,12 +773,16 @@ static void 
_server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
     _server_request_t       *hreq = (_server_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
 
-    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
-           hconn->conn_id, hconn->out_link_id, len);
-    qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
-    if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) {
-        _write_pending_request(hreq);
+    if (hreq->request_discard)
+        qd_buffer_list_free_buffers(blist);
+    else {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] Sending %u octets to server",
+               hconn->conn_id, hconn->out_link_id, len);
+        qdr_http1_enqueue_buffer_list(&hreq->out_data, blist);
+        if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+            _write_pending_request(hreq);
+        }
     }
 }
 
@@ -774,12 +794,16 @@ static void 
_server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_
     _server_request_t       *hreq = (_server_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
 
-    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
-           hconn->conn_id, hconn->out_link_id);
-    qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data);
-    if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && 
hconn->raw_conn) {
-        _write_pending_request(hreq);
+    if (hreq->request_discard)
+        qd_message_stream_data_release(stream_data);
+    else {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
+               hconn->conn_id, hconn->out_link_id);
+        qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data);
+        if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests)) {
+            _write_pending_request(hreq);
+        }
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to