Repository: qpid-dispatch Updated Branches: refs/heads/master a7c49a34a -> 1d9403dc9
DISPATCH-971 - Reverted the rejection of unsettled multicast deliveries. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/db3e06e9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/db3e06e9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/db3e06e9 Branch: refs/heads/master Commit: db3e06e95dcc16d5bd64c7d95a7a66e85f4868dc Parents: a7c49a3 Author: Ted Ross <[email protected]> Authored: Thu Apr 19 12:12:26 2018 -0400 Committer: Ted Ross <[email protected]> Committed: Thu Apr 19 12:12:26 2018 -0400 ---------------------------------------------------------------------- doc/book/theory_of_operation.adoc | 66 ++++++++-- python/qpid_dispatch/management/qdrouter.json | 2 +- src/dispatch.c | 1 - src/dispatch_private.h | 1 - src/router_core/exchange_bindings.c | 12 +- src/router_core/forwarder.c | 12 +- src/router_core/router_core.c | 5 - tests/CMakeLists.txt | 1 - .../system_tests_denied_unsettled_multicast.py | 130 ------------------- tests/system_tests_exchange_bindings.py | 33 +---- 10 files changed, 62 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/doc/book/theory_of_operation.adoc ---------------------------------------------------------------------- diff --git a/doc/book/theory_of_operation.adoc b/doc/book/theory_of_operation.adoc index 487a084..c2deab8 100644 --- a/doc/book/theory_of_operation.adoc +++ b/doc/book/theory_of_operation.adoc @@ -246,15 +246,65 @@ used. Address semantics include the following considerations: ==== Routing Patterns -Routing patterns define the paths that a message with a mobile address can take across a network. These routing patterns can be used for both direct routing, in which the router distributes messages between clients without a broker, and indirect routing, in which the router enables clients to exchange messages through a broker. +Routing patterns define the paths that a message with a mobile address +can take across a network. These routing patterns can be used for both +direct routing, in which the router distributes messages between +clients without a broker, and indirect routing, in which the router +enables clients to exchange messages through a broker. + +Note that the routing patterns fall into two categories: Anycast +(Balanced and Closest) and Multicast. There is no concept of +"unicast" in which there is only one consumer for an address. + +Anycast distribution delivers each message to one consumer whereas +multicast distribution delivers each message to all consumers. + +Anycast delivery is reliable when the message deliveries are +unsettled. There is a reliability contract that the router network +abides by when delivering unsettled messages to anycast addresses. +For every such delivery sent by a producer, the router network +guarantees that one of the following outcomes will occur: + +* The delivery shall be settled with ACCEPTED or REJECTED disposition + where the disposition is supplied by the consumer. +* The delivery shall be settled with RELEASED disposition, meaning + that the message was not delivered to any consumer. +* The delivery shall be settled with MODIFIED disposition, meaning + that the message may have been delivered to a consumer but should be + considered in-doubt and re-sent. +* The connection to the producer shall be dropped, signifying that all + unsettled deliveries should now be considered in-doubt by the + producer and later re-sent. + +Multicast delivery is not reliable. If a producer sends an unsettled +delivery, the ingress router shall settle the delivery with ACCEPTED +disposition regardless of whether the message was delivered to any +consumers. + +===== Balanced + +An anycast method which allows multiple receivers to use the same +address. In this case, messages (or links) are routed to exactly one +of the receivers and the network attempts to balance the traffic load +across the set of receivers using the same address. This routing +delivers messages to receivers based on how quickly they settle the +deliveries. Faster receivers get more messages. + +===== Closest + +An anycast method in which even if there are more receivers for the +same address, every message is sent along the shortest path to reach +the destination. This means that only one receiver will get the +message. Each message is delivered to the closest receivers in terms +of topology cost. If there are multiple receivers with the same lowest +cost, deliveries will be spread evenly among those receivers. + +===== Multicast + +Having multiple consumers on the same address at the same time, +messages are routed such that each consumer receives one copy of the +message. -[cols="20,80"] -|=== -|Pattern | Description -| Balanced | An anycast method which allows multiple receivers to use the same address. In this case, messages (or links) are routed to exactly one of the receivers and the network attempts to balance the traffic load across the set of receivers using the same address. This routing delivers messages to receivers based on how quickly they settle the deliveries. Faster receivers get more messages. -| Closest | An anycast method in which even if there are more receivers for the same address, every message is sent along the shortest path to reach the destination. This means that only one receiver will get the message. Each message is delivered to the closest receivers in terms of topology cost. If there are multiple receivers with the same lowest cost, deliveries will be spread evenly among those receivers. -| Multicast | Having multiple consumers on the same address at the same time, messages are routed such that each consumer receives one copy of the message. -|=== ==== Routing Mechanisms http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index e7ea697..fcbf7e5 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -471,7 +471,7 @@ }, "allowUnsettledMulticast": { "type": "boolean", - "description": "If true, allow senders to send unsettled deliveries to multicast addresses. These deliveries shall be settled by the ingress router. If false, unsettled deliveries to multicast addresses shall be rejected.", + "description": "(DEPRECATED) If true, allow senders to send unsettled deliveries to multicast addresses. These deliveries shall be settled by the ingress router. If false, unsettled deliveries to multicast addresses shall be rejected.", "create": true, "required": false, "default": false http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/src/dispatch.c ---------------------------------------------------------------------- diff --git a/src/dispatch.c b/src/dispatch.c index 840fd29..842bb4d 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -183,7 +183,6 @@ qd_error_t qd_dispatch_configure_router(qd_dispatch_t *qd, qd_entity_t *entity) qd->router_mode = qd_entity_get_long(entity, "mode"); QD_ERROR_RET(); qd->thread_count = qd_entity_opt_long(entity, "workerThreads", 4); QD_ERROR_RET(); - qd->allow_unsettled_multicast = qd_entity_opt_bool(entity, "allowUnsettledMulticast", false); QD_ERROR_RET(); qd->allow_resumable_link_route = qd_entity_opt_bool(entity, "allowResumableLinkRoute", true); QD_ERROR_RET(); if (! qd->sasl_config_path) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/src/dispatch_private.h ---------------------------------------------------------------------- diff --git a/src/dispatch_private.h b/src/dispatch_private.h index 7faa51d..d927023 100644 --- a/src/dispatch_private.h +++ b/src/dispatch_private.h @@ -56,7 +56,6 @@ struct qd_dispatch_t { char *router_area; char *router_id; qd_router_mode_t router_mode; - bool allow_unsettled_multicast; bool allow_resumable_link_route; }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/src/router_core/exchange_bindings.c ---------------------------------------------------------------------- diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c index 02bf192..d03f795 100644 --- a/src/router_core/exchange_bindings.c +++ b/src/router_core/exchange_bindings.c @@ -188,18 +188,8 @@ int qdr_forward_exchange_CT(qdr_core_t *core, // NOTE: This is the only multicast mode currently supported. Others will likely be // implemented in the future. // - if (!presettled) { + if (!presettled) in_delivery->settled = true; - // - // If the router is configured to reject unsettled multicasts, settle and reject this delivery. - // - if (!core->qd->allow_unsettled_multicast) { - in_delivery->disposition = PN_REJECTED; - in_delivery->error = qdr_error("qd:forbidden", "Deliveries to an exchange must be pre-settled"); - qdr_delivery_push_CT(core, in_delivery); - return 0; - } - } qd_iterator_t *subject = qd_message_check(msg, QD_DEPTH_PROPERTIES) ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 82bce7c..b8f2291 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -237,18 +237,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core, // NOTE: This is the only multicast mode currently supported. Others will likely be // implemented in the future. // - if (!presettled) { + if (!presettled) in_delivery->settled = true; - // - // If the router is configured to reject unsettled multicasts, settle and reject this delivery. - // - if (!core->qd->allow_unsettled_multicast) { - in_delivery->disposition = PN_REJECTED; - in_delivery->error = qdr_error("qd:forbidden", "Deliveries to a multicast address must be pre-settled"); - qdr_delivery_push_CT(core, in_delivery); - return 0; - } - } // // Forward to local subscribers http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 79f3d11..9d7f7a5 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -56,11 +56,6 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, core->agent_log = qd_log_source("AGENT"); // - // Report on the configuration for unsettled multicasts - // - qd_log(core->log, QD_LOG_INFO, "Allow Unsettled Multicast: %s", qd->allow_unsettled_multicast ? "yes" : "no"); - - // // Set up the threading support // core->action_cond = sys_cond(); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2de2aea..ee4e080 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -99,7 +99,6 @@ foreach(py_test_module system_tests_dynamic_terminus system_tests_log_message_components system_tests_failover_list - system_tests_denied_unsettled_multicast system_tests_auth_service_plugin system_tests_authz_service_plugin system_tests_delivery_abort http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/tests/system_tests_denied_unsettled_multicast.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_denied_unsettled_multicast.py b/tests/system_tests_denied_unsettled_multicast.py deleted file mode 100644 index bc85084..0000000 --- a/tests/system_tests_denied_unsettled_multicast.py +++ /dev/null @@ -1,130 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest2 as unittest -from proton import Message, Timeout -from system_test import TestCase, Qdrouterd, main_module, TIMEOUT -from proton.handlers import MessagingHandler -from proton.reactor import Container - -# PROTON-828: -try: - from proton import MODIFIED -except ImportError: - from proton import PN_STATUS_MODIFIED as MODIFIED - - -class RouterTest(TestCase): - - inter_router_port = None - - @classmethod - def setUpClass(cls): - """Start a router""" - super(RouterTest, cls).setUpClass() - - def router(name): - - config = [ - ('router', {'mode': 'standalone', 'id': name}), - ('listener', {'port': cls.tester.get_port()}), - ('address', {'prefix': 'multicast', 'distribution' : 'multicast'}), - ] - - config = Qdrouterd.Config(config) - - cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) - - cls.routers = [] - - inter_router_port = cls.tester.get_port() - - router('A') - cls.routers[0].wait_ready() - - - def test_01_default_multicast_test(self): - test = DeniedUnsettledMulticastTest(self.routers[0].addresses[0]) - test.run() - self.assertEqual(None, test.error) - - -class Timeout(object): - def __init__(self, parent): - self.parent = parent - - def on_timer_task(self, event): - self.parent.timeout() - - -class DeniedUnsettledMulticastTest(MessagingHandler): - def __init__(self, host): - super(DeniedUnsettledMulticastTest, self).__init__() - self.host = host - self.count = 10 - self.error = None - self.addr = "multicast/test" - self.sent_uns = 0 - self.sent_pres = 0 - self.n_received = 0 - self.n_rejected = 0 - - def timeout(self): - self.error = "Timeout Expired - n_received=%d n_rejected=%d" % (self.n_received, self.n_rejected) - self.conn.close() - - def check_done(self): - if self.n_received == self.count and self.n_rejected == self.count: - self.conn.close() - self.timer.cancel() - - def send(self): - while self.sent_uns < self.count: - m = Message(body="Unsettled %d" % self.sent_uns) - self.sender.send(m) - self.sent_uns += 1 - while self.sent_pres < self.count: - m = Message(body="Presettled %d" % self.sent_pres) - dlv = self.sender.send(m) - dlv.settle() - self.sent_pres += 1 - - def on_start(self, event): - self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) - self.conn = event.container.connect(self.host) - self.receiver = event.container.create_receiver(self.conn, self.addr) - self.sender = event.container.create_sender(self.conn, self.addr) - - def on_sendable(self, event): - self.send() - - def on_message(self, event): - self.n_received += 1 - self.check_done() - - def on_rejected(self, event): - self.n_rejected += 1 - self.check_done() - - def run(self): - Container(self).run() - - -if __name__ == '__main__': - unittest.main(main_module()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db3e06e9/tests/system_tests_exchange_bindings.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_exchange_bindings.py b/tests/system_tests_exchange_bindings.py index 5bf1ff6..e03f36a 100644 --- a/tests/system_tests_exchange_bindings.py +++ b/tests/system_tests_exchange_bindings.py @@ -462,8 +462,7 @@ class ExchangeBindingsTest(TestCase): Forward unsettled messages to multiple subscribers """ config = [ - ('router', {'mode': 'standalone', 'id': 'QDR.mcast', - 'allowUnsettledMulticast': True}), + ('router', {'mode': 'standalone', 'id': 'QDR.mcast'}), ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': self.tester.get_port(), 'saslMechanisms':'ANONYMOUS'}), @@ -508,36 +507,6 @@ class ExchangeBindingsTest(TestCase): self.assertTrue(nhop2B.queue.empty()) self.assertTrue(alt.queue.empty()) - # ensure failure if unsettled multicast not allowed: - - config = [ - ('router', {'mode': 'standalone', 'id': 'QDR.mcast2', - 'allowUnsettledMulticast': False}), - ('listener', {'role': 'normal', 'host': '0.0.0.0', - 'port': self.tester.get_port(), - 'saslMechanisms':'ANONYMOUS'}), - ('exchange', {'address': 'Address4', - 'name': 'Exchange1'}), - ('binding', {'name': 'binding1', - 'exchangeName': 'Exchange1', - 'bindingKey': 'a.b', - 'nextHopAddress': 'nextHop1'}) - ] - router = self.tester.qdrouterd('QDR.mcast2', Qdrouterd.Config(config)) - - # create clients for message transfer - conn = BlockingConnection(router.addresses[0]) - sender = conn.create_sender(address="Address4", options=AtLeastOnce()) - nhop1 = AsyncTestReceiver(address=router.addresses[0], source="nextHop1") - - self.assertRaises(SendException, - sender.send, - Message(subject='a.b', body='A')) - nhop1.stop() - conn.close() - - self.assertTrue(nhop1.queue.empty()) - def test_remote_exchange(self): """ Verify that the exchange and bindings are visible to other routers in --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
