asfgit closed pull request #425: DISPATCH-1213 - Prevent stalling of presettled 
large message senders …
URL: https://github.com/apache/qpid-dispatch/pull/425
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index a03db277..7215d2bf 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -682,6 +682,7 @@ void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t 
*delivery, const char
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int 
*length);
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
 void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* 
pdlv, bool update_disposition);
 bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
 bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1cce8de6..ecf7eea9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -434,6 +434,11 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t 
*delivery)
     return delivery->error;
 }
 
+bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
+{
+    return delivery->presettled;
+}
+
 
 
//==================================================================================
 // In-Thread Functions
@@ -441,11 +446,24 @@ qdr_error_t *qdr_delivery_error(const qdr_delivery_t 
*delivery)
 
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
-    bool push = dlv->disposition != PN_RELEASED;
+    bool push = false;
+    bool moved = false;
 
-    dlv->disposition = PN_RELEASED;
-    dlv->settled = true;
-    bool moved = qdr_delivery_settled_CT(core, dlv);
+    if (dlv->presettled) {
+        //
+        // The delivery is presettled. We simply want to call 
CORE_delivery_update which in turn will
+        // restart stalled links if the q2_holdoff has been hit.
+        // For single frame presettled deliveries, calling 
CORE_delivery_update does not do anything.
+        //
+        push = true;
+    }
+    else {
+        push = dlv->disposition != PN_RELEASED;
+        dlv->disposition = PN_RELEASED;
+        dlv->settled = true;
+        moved = qdr_delivery_settled_CT(core, dlv);
+
+    }
 
     if (push || moved)
         qdr_delivery_push_CT(core, dlv);
@@ -848,6 +866,13 @@ static void qdr_link_forward_CT(qdr_core_t *core, 
qdr_link_t *link, qdr_delivery
             link->dropped_presettled_deliveries++;
             if (dlv->link->link_type == QD_LINK_ENDPOINT)
                 core->dropped_presettled_deliveries++;
+
+            //
+            // The delivery is pre-settled. Call the qdr_delivery_release_CT 
so if this delivery is multi-frame
+            // we can restart receiving the delivery in case it is stalled. 
Note that messages will not
+            // *actually* be released because these are presettled messages.
+            //
+            qdr_delivery_release_CT(core, dlv);
         } else {
             qdr_delivery_release_CT(core, dlv);
 
diff --git a/src/router_node.c b/src/router_node.c
index 597b8452..f6f305c2 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1608,7 +1608,7 @@ static void CORE_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t di
     //
     // If the disposition has changed, update the proton delivery.
     //
-    if (disp != pn_delivery_remote_state(pnd)) {
+    if (disp != pn_delivery_remote_state(pnd) && 
!qdr_delivery_presettled(dlv)) {
         qd_message_t *msg = qdr_delivery_message(dlv);
 
         if (disp == PN_MODIFIED)
@@ -1638,7 +1638,7 @@ static void CORE_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t di
             // If the delivery is settled and it is still arriving, defer the 
settlement
             // until the content has fully arrived.
             //
-            if (disp == PN_RELEASED || disp == PN_MODIFIED) {
+            if (disp == PN_RELEASED || disp == PN_MODIFIED || 
qdr_delivery_presettled(dlv)) {
                 //
                 // If the disposition is RELEASED or MODIFIED, set the message 
to discard
                 // and if it is blocked by holdoff, get the link rolling again.
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index dd7cd31a..fd8ed597 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -422,6 +422,16 @@ def test_41_large_streaming_close_conn_test(self):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_42_dropped_presettled_receiver_stops(self):
+        local_node = Node.connect(self.address, timeout=TIMEOUT)
+        res = local_node.query('org.apache.qpid.dispatch.router')
+        deliveries_ingress = res.attribute_names.index(
+            'deliveriesIngress')
+        ingress_delivery_count = res.results[0][deliveries_ingress]
+        test = DroppedPresettledTest(self.address, 200, ingress_delivery_count)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Entity(object):
     def __init__(self, status_code, status_description, attrs):
@@ -1001,6 +1011,104 @@ def on_message ( self, event ) :
             self.bail ( None )
 
 
+class PresettledCustomTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
+        res = local_node.query('org.apache.qpid.dispatch.router')
+        deliveries_ingress = res.attribute_names.index(
+            'deliveriesIngress')
+        ingress_delivery_count = res.results[0][deliveries_ingress]
+        self.parent.cancel_custom()
+
+        # Without the fix for DISPATCH--1213  the ingress count will be less 
than
+        # 200 because the sender link has stalled. The q2_holdoff happened
+        # and so all the remaining messages are still in the
+        # proton buffers.
+
+        if ingress_delivery_count - self.parent.begin_ingress_count > 
self.parent.n_messages:
+            self.parent.bail(None)
+        else:
+            self.parent.bail("Messages sent to the router is %d, "
+                             "Messages processed by the router is %d",
+                             (self.parent.n_messages,
+                              ingress_delivery_count - 
self.parent.begin_ingress_count))
+
+
+class DroppedPresettledTest(MessagingHandler):
+    def __init__(self, addr, n_messages, begin_ingress_count):
+        super (DroppedPresettledTest, self).__init__()
+        self.addr = addr
+        self.n_messages = n_messages
+        self.sender = None
+        self.receiver = None
+        self.sender_conn = None
+        self.recv_conn = None
+        self.n_sent = 0
+        self.n_received = 0
+        self.error = None
+        self.test_timer = None
+        self.max_receive = 10
+        self.custom_timer = None
+        self.timer = None
+        self.begin_ingress_count = begin_ingress_count
+        self.str1 = "0123456789abcdef"
+        self.msg_str = ""
+        for i in range(8192):
+            self.msg_str += self.str1
+
+    def run (self):
+        Container(self).run()
+
+    def bail(self, travail):
+        self.error = travail
+        self.sender_conn.close()
+        if self.recv_conn:
+            self.recv_conn.close()
+        self.timer.cancel()
+
+    def timeout(self,):
+        self.bail("Timeout Expired: %d messages received, %d expected." %
+                  (self.n_received, self.n_messages))
+
+    def on_start (self, event):
+        self.sender_conn = event.container.connect(self.addr)
+        self.recv_conn = event.container.connect(self.addr)
+        self.receiver = event.container.create_receiver(self.recv_conn,
+                                                        "test_42")
+        self.sender = event.container.create_sender(self.sender_conn,
+                                                    "test_42")
+        self.timer = event.reactor.schedule(10, Timeout(self))
+
+    def cancel_custom(self):
+        self.custom_timer.cancel()
+
+    def on_sendable(self, event):
+        while self.n_sent < self.n_messages:
+            msg = Message(id=(self.n_sent + 1),
+                          body={'sequence': (self.n_sent + 1),
+                                'msg_str': self.msg_str})
+            # Presettle the delivery.
+            dlv = self.sender.send (msg)
+            dlv.settle()
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.max_receive:
+            # Receiver bails after receiving max_receive messages.
+            self.receiver.close()
+            self.recv_conn.close()
+
+            # The sender is only sending 200 large messages which is less
+            # that the initial credit of 250 that the router gives.
+            # Lets do a qdstat to find out if all 200 messages is handled
+            # by the router.
+            self.custom_timer = event.reactor.schedule(1,
+                                                       PresettledCustomTimeout(
+                                                           self))
 
 class MulticastUnsettled ( MessagingHandler ) :
     def __init__ ( self,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to