Repository: qpid-dispatch Updated Branches: refs/heads/master 778b0ba23 -> 7279a153b
DISPATCH-366 - Use modified+delivery-failed for released messages whose deliveries are in doubt. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7279a153 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7279a153 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7279a153 Branch: refs/heads/master Commit: 7279a153b7f04196bcba2876d2e1d7ff2a003919 Parents: 778b0ba Author: Ted Ross <[email protected]> Authored: Thu Jun 9 09:22:11 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Thu Jun 9 09:22:11 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 2 +- src/router_core/router_core_private.h | 1 + src/router_core/transfer.c | 19 +++++++++ src/router_node.c | 5 ++- tests/system_tests_one_router.py | 65 ++++++++++++++++++++++++++---- 5 files changed, 83 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index b352d92..701a91b 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -512,7 +512,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li if (peer) { peer->peer = 0; if (link->link_direction == QD_OUTGOING) - qdr_delivery_release_CT(core, peer); + qdr_delivery_failed_CT(core, peer); qdr_delivery_decref(peer); } qdr_delivery_decref(dlv); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 1cd9438..394bd55 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -596,6 +596,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index b587baf..40ba82e 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -311,6 +311,25 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv) } +void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv) +{ + bool push = dlv->disposition != PN_MODIFIED; + + dlv->disposition = PN_MODIFIED; + dlv->settled = true; + bool moved = qdr_delivery_settled_CT(core, dlv); + + if (push || moved) + qdr_delivery_push_CT(core, dlv); + + // + // Remove the unsettled reference + // + if (moved) + qdr_delivery_decref(dlv); +} + + bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) { // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index b8e8f0e..819a69f 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -859,8 +859,11 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di // // If the disposition has changed, update the proton delivery. // - if (disp != pn_delivery_remote_state(pnd)) + if (disp != pn_delivery_remote_state(pnd)) { + if (disp == PN_MODIFIED) + pn_disposition_set_failed(pn_delivery_local(pnd), true); pn_delivery_update(pnd, disp); + } // // If the delivery is settled, remove the linkage and settle the proton delivery. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index eb3b6b1..130c05a 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -18,17 +18,11 @@ # import unittest -from proton import Message, PENDING, ACCEPTED, REJECTED +from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED from system_test import TestCase, Qdrouterd, main_module from proton.handlers import MessagingHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce -# PROTON-828: -try: - from proton import MODIFIED -except ImportError: - from proton import PN_STATUS_MODIFIED as MODIFIED - class RouterTest(TestCase): """System tests involving a single router""" @@ -1070,6 +1064,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_18_released_vs_modified(self): + test = ReleasedVsModifiedTest(self.address) + test.run() + self.assertEqual(None, test.error) + class Timeout(object): def __init__(self, parent): @@ -1257,5 +1256,57 @@ class MultiframePresettledTest(MessagingHandler): Container(self).run() +class ReleasedVsModifiedTest(MessagingHandler): + def __init__(self, address): + super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False) + self.address = address + self.dest = "closest.RVMtest" + self.error = None + self.count = 10 + self.accept = 6 + self.n_sent = 0 + self.n_received = 0 + self.n_released = 0 + self.n_modified = 0 + + def check_if_done(self): + if self.n_received == self.accept and self.n_released == self.count - self.accept and self.n_modified == self.accept: + self.timer.cancel() + self.conn.close() + + def timeout(self): + self.error = "Timeout Expired: sent=%d, received=%d, released=%d, modified=%d" % \ + (self.n_sent, self.n_received, self.n_released, self.n_modified) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.conn, self.dest) + self.receiver = event.container.create_receiver(self.conn, self.dest, name="A") + self.receiver.flow(self.accept) + + def on_sendable(self, event): + for i in range(self.count - self.n_sent): + msg = Message(body="RvM-Test") + event.sender.send(msg) + self.n_sent += 1 + + def on_message(self, event): + self.n_received += 1 + if self.n_received == self.accept: + self.receiver.close() + + def on_released(self, event): + if event.delivery.remote_state == Delivery.MODIFIED: + self.n_modified += 1 + else: + self.n_released += 1 + self.check_if_done() + + def run(self): + Container(self).run() + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
