Repository: qpid-dispatch Updated Branches: refs/heads/master c728d7f41 -> ab4421db6
DISPATCH-1012 - For anycast addresses, deliveries to an address that has no attached destinations shall be released by the router network and credit not replenished to the sender. Once a destination becomes reachable, the sender's credit shall be topped back up to the original capacity. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ab4421db Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ab4421db Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ab4421db Branch: refs/heads/master Commit: ab4421db6b1371e5ba902bc441ccc332352e7516 Parents: c728d7f Author: Ted Ross <[email protected]> Authored: Tue May 29 16:25:00 2018 -0400 Committer: Ted Ross <[email protected]> Committed: Tue May 29 16:25:00 2018 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 2 + src/router_core/router_core_private.h | 2 +- src/router_core/transfer.c | 56 ++++++------ tests/system_tests_one_router.py | 132 ++++++++++++++++++----------- 4 files changed, 114 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 0e655d3..5fdc3bf 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -442,6 +442,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, strcpy(link->name, name); link->link_direction = dir; link->capacity = conn->link_capacity; + link->credit_pending = conn->link_capacity; link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; @@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->link_type = link_type; link->link_direction = dir; link->capacity = conn->link_capacity; + link->credit_pending = conn->link_capacity; link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8); link->disambiguated_name = 0; link->terminus_addr = 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index b3c9798..8017c67 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -398,10 +398,10 @@ struct qdr_link_t { qdr_link_oper_status_t oper_status; int capacity; int credit_to_core; ///< Number of the available credits incrementally given to the core + int credit_pending; ///< Number of credits to be issued once consumers are available bool admin_enabled; bool strip_annotations_in; bool strip_annotations_out; - bool flow_started; ///< for incoming, true iff initial credit has been granted bool drain_mode; bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 706a26d..d92cc2b 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -795,32 +795,30 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery // // We are trying to forward a delivery on an address that has no outbound paths // AND the incoming link is targeted (not anonymous). - - // If the owning_addr is a multicast addr and there are no outbound paths, we will release this delivery - // and replenish the credit. - - // For non multicast addresses, put the delivery on the incoming link's undelivered list. Note that it is safe - // to do this because the undelivered list will be flushed once the number of - // paths transitions from zero to one. // - // Use the action-reference as the reference for undelivered rather - // than decrementing and incrementing the delivery ref_count. + // We shall release the delivery (it is currently undeliverable). If the distribution is + // multicast, we will replenish the credit. If it is anycast, we will allow the credit to + // drain. // - if (qdr_is_addr_treatment_multicast(link->owning_addr)) { + if (dlv->settled) { + // Increment the presettled_dropped_deliveries on the in_link + link->dropped_presettled_deliveries++; + core->dropped_presettled_deliveries++; + } else qdr_delivery_release_CT(core, dlv); + + if (qdr_is_addr_treatment_multicast(link->owning_addr)) qdr_link_issue_credit_CT(core, link, 1, false); - qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)"); - if (dlv->settled) { - // Increment the presettled_dropped_deliveries on the in_link - link->dropped_presettled_deliveries++; - core->dropped_presettled_deliveries++; - } - } - else { - DEQ_INSERT_TAIL(link->undelivered, dlv); - dlv->where = QDR_DELIVERY_IN_UNDELIVERED; - qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_forward_CT: action-list -> undelivered-list", (long) dlv); - } + else + link->credit_pending++; + + // + // Set the discard flag on the message only if the message is not completely received yet. + // + if (!receive_complete) + qd_message_set_discard(dlv->msg, true); + + qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)"); return; } @@ -834,7 +832,6 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery if (qdr_connection_route_container(link->conn)) { addr->deliveries_ingress_route_container++; core->deliveries_ingress_route_container++; - } } @@ -1151,7 +1148,8 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_deliver_continue_peers_CT(core, in_dlv); - if (qd_message_receive_complete(qdr_delivery_message(in_dlv))) { + qd_message_t *msg = qdr_delivery_message(in_dlv); + if (qd_message_receive_complete(msg) && !qd_message_is_discard(msg)) { // // The entire message has now been received. Check to see if there are in process subscriptions that need to // receive this message. in process subscriptions, at this time, can deal only with full messages. @@ -1203,12 +1201,12 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo bool drain_changed = link->drain_mode |= drain; link->drain_mode = drain; + if (link->credit_pending > 0) + link->credit_pending = link->credit_pending > credit ? link->credit_pending - credit : 0; + if (!drain_changed && credit == 0) return; - if (credit > 0) - link->flow_started = true; - qdr_link_work_t *work = new_qdr_link_work_t(); ZERO(work); @@ -1271,8 +1269,8 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) // // Issue credit to stalled links // - if (!link->flow_started) - qdr_link_issue_credit_CT(core, link, link->capacity, false); + if (link->credit_pending > 0) + qdr_link_issue_credit_CT(core, link, link->credit_pending, false); // // Drain undelivered deliveries via the forwarder http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ab4421db/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index ea9f3cf..3664acf 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -283,7 +283,7 @@ class OneRouterTest(TestCase): test.run() self.assertEqual(None, test.error) - def test_26_multicast_no_receivcer(self): + def test_26_multicast_no_receiver(self): test = MulticastUnsettledNoReceiverTest(self.address) test.run() self.assertEqual(None, test.error) @@ -410,6 +410,51 @@ class OneRouterTest(TestCase): client.connection.close() +class Entity(object): + def __init__(self, status_code, status_description, attrs): + self.status_code = status_code + self.status_description = status_description + self.attrs = attrs + + def __getattr__(self, key): + return self.attrs[key] + + +class RouterProxy(object): + def __init__(self, reply_addr): + self.reply_addr = reply_addr + + def response(self, msg): + ap = msg.properties + bd = msg.body + if bd.__class__ == dict and 'results' in bd and 'attributeNames' in bd: + ## + ## This is a query response + ## + response = [] + anames = bd['attributeNames'] + for row in bd['results']: + cols = {} + for i in range(len(row)): + cols[anames[i]] = row[i] + response.append(Entity(ap['statusCode'], ap['statusDescription'], cols)) + return response + + return Entity(ap['statusCode'], ap['statusDescription'], msg.body) + + def read_address(self, name): + ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name} + return Message(properties=ap, reply_to=self.reply_addr) + + def query_addresses(self): + ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'} + return Message(properties=ap, reply_to=self.reply_addr) + + def query_links(self): + ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.link'} + return Message(properties=ap, reply_to=self.reply_addr) + + class SemanticsClosest(MessagingHandler): def __init__(self, address): super(SemanticsClosest, self).__init__() @@ -2282,10 +2327,10 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler): these messages since there is no receiver. """ def __init__(self, address): - super(MulticastUnsettledNoReceiverTest, self).__init__(prefetch=0) + super(MulticastUnsettledNoReceiverTest, self).__init__() self.address = address self.dest = "multicast.MulticastNoReceiverTest" - self.error = "Some error" + self.error = None self.n_sent = 0 self.max_send = 250 self.n_released = 0 @@ -2293,55 +2338,59 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler): self.timer = None self.conn = None self.sender = None + self.query_sent = False + + def timeout(self): + self.error = "Timeout expired: n_sent=%d n_released=%d n_accepted=%d" % \ + (self.n_sent, self.n_released, self.n_accepted) + self.conn.close() def check_if_done(self): if self.n_accepted > 0: self.error = "Messages should not be accepted as there are no receivers" self.timer.cancel() self.conn.close() - elif self.n_sent == self.n_released: - self.error = None - - if not self.error: - local_node = Node.connect(self.address, timeout=TIMEOUT) - for result in local_node.query(type='org.apache.qpid.dispatch.router.link').results: - if result[5] == 'in' and 'multicast.MulticastNoReceiverTest' in result[6]: - if result[16] != 250: - self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16]) - else: - outs = local_node.query(type='org.apache.qpid.dispatch.router') - pos = outs.attribute_names.index("droppedPresettledDeliveries") - results = outs.results[0] - if results[pos] != 250: - self.error = "When querying router, expected 250 dropped presettled " \ - "deliveries but got " + str(results[pos]) - else: - pos = outs.attribute_names.index("releasedDeliveries") - if results[pos] < 250: - self.error = "The number of released deliveries cannot be less that 250 " \ - "but it is " + str(results[pos]) - - self.timer.cancel() - self.conn.close() + elif self.max_send == self.n_released and not self.query_sent: + self.mgmt_tx.send(self.proxy.query_links()) + self.query_sent = True def on_start(self, event): self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) self.conn = event.container.connect(self.address) - self.sender = event.container.create_sender(self.conn, self.dest) + self.mgmt_rx = event.container.create_receiver(self.conn, dynamic=True) + self.mgmt_tx = event.container.create_sender(self.conn, '$management') + + def on_link_opened(self, event): + if event.receiver == self.mgmt_rx: + self.proxy = RouterProxy(self.mgmt_rx.remote_source.address) + self.sender = event.container.create_sender(self.conn, self.dest) + + def on_message(self, event): + if event.receiver == self.mgmt_rx: + results = self.proxy.response(event.message) + for link in results: + if link.linkDir == 'in' and link.owningAddr == 'M0' + self.dest: + if link.releasedCount != self.max_send: + self.error = "Released count expected %d, got %d" % (self.max_send, link.droppedPresettledCount) + self.timer.cancel() + self.conn.close() def on_sendable(self, event): - if self.n_sent >= self.max_send: - return - self.n_sent += 1 - msg = Message(body=self.n_sent) - event.sender.send(msg) + if event.sender == self.sender: + if self.n_sent >= self.max_send: + return + self.n_sent += 1 + msg = Message(body=self.n_sent) + event.sender.send(msg) def on_accepted(self, event): - self.n_accepted += 1 + if event.sender == self.sender: + self.n_accepted += 1 self.check_if_done() def on_released(self, event): - self.n_released += 1 + if event.sender == self.sender: + self.n_released += 1 self.check_if_done() def run(self): @@ -2638,19 +2687,6 @@ class PresettledOverflowTest(MessagingHandler): if result[5] == 'out' and 'balanced.PresettledOverflow' in result[6]: if result[16] != 250: self.error = "Expected 250 dropped presettled deliveries but got " + str(result[16]) - else: - outs = local_node.query(type='org.apache.qpid.dispatch.router') - pos_presett = outs.attribute_names.index("presettledDeliveries") - pos = outs.attribute_names.index("droppedPresettledDeliveries") - results = outs.results[0] - # Enforce presettledDeliveries metric is updated - if results[pos_presett] < 500: - self.error = "When querying router, expected 500 presettled " \ - "deliveries but got " + str(results[pos_presett]) - # There is 250 from a previous test - if results[pos] < 500: - self.error = "When querying router, expected 500 dropped presettled " \ - "deliveries but got " + str(results[pos]) self.conn.close() self.timer.cancel() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
