This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 3234b9d DISPATCH-1541: Sets the presettled flag on large streaming
discarded messages. This will help set the correct presettled related counters
on the link. This closes #663
3234b9d is described below
commit 3234b9d61040759da760a05eb8102b7897a99ac9
Author: Ganesh Murthy <[email protected]>
AuthorDate: Wed Jan 15 12:16:40 2020 -0500
DISPATCH-1541: Sets the presettled flag on large streaming discarded
messages. This will help set the correct presettled related counters on the
link. This closes #663
---
src/router_core/delivery.c | 5 ++
src/router_core/delivery.h | 7 ++
src/router_node.c | 10 +++
tests/system_tests_delivery_counts.py | 153 ++++++++++++++++++++++++++++++++++
4 files changed, 175 insertions(+)
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 4d62902..df31f80 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -124,6 +124,11 @@ bool qdr_delivery_is_aborted(const qdr_delivery_t
*delivery)
return qd_message_aborted(delivery->msg);
}
+void qdr_delivery_set_presettled(qdr_delivery_t *delivery)
+{
+ if (delivery)
+ delivery->presettled = true;
+}
void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const
char *label)
{
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 3553b74..9a7edfb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -110,6 +110,13 @@ void qdr_delivery_write_extension_state(qdr_delivery_t
*dlv, pn_delivery_t* pdlv
/* release dlv and possibly schedule its deletion on the core thread */
void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const
char *label);
+/** Set the presettled flag on the delivery to true if it is not already true.
+ * The presettled flag can only go from false to true and not vice versa.
+ * This function should only be called when the delivery has been discarded
and receive_complete flag is true in which case there
+ * will be no thread contention.
+**/
+void qdr_delivery_set_presettled(qdr_delivery_t *delivery);
+
/* handles delivery disposition and settlement changes from the remote end of
* the link, and schedules Core thread */
void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t
*delivery, uint64_t disp,
diff --git a/src/router_node.c b/src/router_node.c
index 8f4da6e..b923b7d 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -355,6 +355,16 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
// Message has been marked for discard, no further processing necessary
//
if (receive_complete) {
+ // If this discarded delivery has already been settled by proton,
+ // set the presettled flag on the delivery to true if it is not
already true.
+ // Since the entire message has already been received, we directly
call the
+ // function to set the pre-settled flag since we cannot go thru
the core-thread
+ // to do this since the delivery has been discarded.
+ // Discarded streaming deliveries are not put thru the core thread
via the continue action.
+ if (pn_delivery_settled(pnd))
+ qdr_delivery_set_presettled(delivery);
+
+
// note: expected that the code that set discard has handled
// setting disposition and updating flow!
pn_delivery_settle(pnd);
diff --git a/tests/system_tests_delivery_counts.py
b/tests/system_tests_delivery_counts.py
index 61c976c..3c407b9 100644
--- a/tests/system_tests_delivery_counts.py
+++ b/tests/system_tests_delivery_counts.py
@@ -251,6 +251,14 @@ class AddressCheckerTimeout ( object ):
def on_timer_task(self, event):
self.parent.address_check_timeout()
+
+class CounterCheckerTimeout ( object ):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.count_check_timeout()
+
class LargePresettledLinkCounterTest(MessagingHandler):
def __init__(self, sender_addr, receiver_addr):
super(LargePresettledLinkCounterTest, self).__init__()
@@ -348,6 +356,110 @@ class LargePresettledLinkCounterTest(MessagingHandler):
Container(self).run()
+class LargePresettledReleasedLinkCounterTest(MessagingHandler):
+ def __init__(self, sender_addr, receiver_addr):
+ super(LargePresettledReleasedLinkCounterTest,
self).__init__(prefetch=0)
+ self.sender_addr = sender_addr
+ self.receiver_addr = receiver_addr
+ self.dest = "LargePresettledReleasedLinkCounterTest"
+ self.receiver_dropoff_count = 50
+ self.num_messages = 200
+ self.num_attempts = 0
+ self.n_sent= 0
+ self.done = False
+ self.n_received = 0
+ self.count_check_timer = None
+ self.success = False
+ self.links = None
+ self.receiver_conn_closed = False
+
+ def check_if_done(self):
+ # Step 6:
+ # Check the counts on the inter-router link of
+ # Router B (where the receiver is attached). There
+ # should be no released or modified messages.
+ self.links = get_inter_router_links(self.receiver_addr)
+ for link in self.links:
+ # We don't know how many deliveries got from one side of the
+ # inter-router link to the other but there should at least be as
+ # many as was sent to the receiver
+ if link.get("linkDir") == "in" \
+ and link.get("presettledCount") >
self.receiver_dropoff_count \
+ and link.get("deliveryCount") >
self.receiver_dropoff_count \
+ and link.get("releasedCount") == 0\
+ and link.get("modifiedCount") == 0:
+ self.success = True
+ break
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def count_check_timeout(self):
+ self.check_if_done()
+
+ def address_check_timeout(self):
+ if has_mobile_dest_in_address_table(self.sender_addr, self.dest):
+ # Step 3: The address has propagated to Router A. Now attach a
sender
+ # to router A.
+ self.sender_conn = self.container.connect(self.sender_addr)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.dest,
+ name='SenderA')
+ else:
+ if self.num_attempts < 2:
+ self.address_check_timer = self.reactor.schedule(2,
AddressCheckerTimeout(self))
+ self.num_attempts += 1
+
+ def timeout(self):
+ self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d
" % (self.n_sent, self.n_received)
+ self.sender_conn.close()
+ if not self.receiver_conn_closed:
+ self.receiver_conn.close()
+
+ def on_start(self, event):
+ self.container = event.container
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ # Step 1: Create a receiver with name ReceiverA to address
LargePresettledReleasedLinkCounterTest
+ # This receiver is attached to router B. Later a sender will be
+ # created which will be connected to Router A. The sender will send
+ # on the same address that the receiver is receiving on.
+ self.receiver_conn = event.container.connect(self.receiver_addr)
+ self.receiver = event.container.create_receiver(self.receiver_conn,
+ self.dest,
+ name='ReceiverA')
+ self.receiver.flow(self.receiver_dropoff_count)
+
+ def on_link_opened(self, event):
+ self.reactor = event.reactor
+ if event.receiver:
+ # Step 2: The receiver link has been opened.
+ # Give 2 seconds for the address to propagate to the other router
(Router A)
+ self.address_check_timer = event.reactor.schedule(2,
AddressCheckerTimeout(self))
+ self.num_attempts += 1
+
+ def on_sendable(self, event):
+ # Step 4: Send self.num_messages multi-frame large pre-settled
messages.
+ # These messages will travel over inter-router link to Router B.
+ while self.n_sent < self.num_messages:
+ msg = Message(body=LARGE_PAYLOAD)
+ dlv = self.sender.send(msg)
+ # We are sending a pre-settled large multi frame message.
+ dlv.settle()
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if self.receiver == event.receiver and not self.done:
+ self.n_received += 1
+ # Step 5: The receiver receives only 50 messages out of the 200
+ # messages and drops out.
+ if self.n_received == self.receiver_dropoff_count:
+ self.done = True
+ self.receiver_conn.close()
+ self.receiver_conn_closed = True
+ self.count_check_timer = event.reactor.schedule(3,
CounterCheckerTimeout(self))
+
+ def run(self):
+ Container(self).run()
+
class TwoRouterLargeMessagePresettledCountTest(TestCase):
@classmethod
@@ -390,6 +502,47 @@ class TwoRouterLargeMessagePresettledCountTest(TestCase):
self.assertTrue(test.success)
+class TwoRouterLargeMessagePresettledReleasedCountTest(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(TwoRouterLargeMessagePresettledReleasedCountTest,
cls).setUpClass()
+
+ listen_port_1 = cls.tester.get_port()
+ listen_port_2 = cls.tester.get_port()
+ listen_port_inter_router = cls.tester.get_port()
+
+ config_1 = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'A'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ('listener', {'port': listen_port_1, 'authenticatePeer': False,
'saslMechanisms': 'ANONYMOUS'}),
+ ('listener', {'role': 'inter-router', 'port':
listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms':
'ANONYMOUS'}),
+ ])
+
+ config_2 = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'B'}),
+ ('listener', {'port': listen_port_2, 'authenticatePeer': False,
'saslMechanisms': 'ANONYMOUS'}),
+ ('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': listen_port_inter_router,
+ 'verifyHostname': 'no'}),
+ ])
+
+ cls.routers = []
+ cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
+ cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
+ cls.routers[1].wait_router_connected('A')
+
+ def test_verify_inter_router_presettled_released_count_DISPATCH_1541(self):
+ # This test sends presettled large messages across routers. A sender
is on
+ # router A and a receiver on B. The sender sends 200 messages, the
receiver
+ # receives 50 messages and goes away by closing its connection. There
should be no released or
+ # modified messages on the incoming inter-router link on Router B
+ # This test will fail without the patch to DISPATCH-1541
+ sender_address = self.routers[0].addresses[0]
+ receiver_address = self.routers[1].addresses[0]
+ test = LargePresettledReleasedLinkCounterTest(sender_address,
receiver_address)
+ test.run()
+ self.assertTrue(test.success)
+
+
class LinkRouteIngressEgressTransitTest(TestCase):
@classmethod
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]