Repository: qpid-dispatch Updated Branches: refs/heads/tross-dispatch-781-1 [created] 85a89965a
DISPATCH-781 - Completed the handling of credit for flow control Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/135ffe78 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/135ffe78 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/135ffe78 Branch: refs/heads/tross-dispatch-781-1 Commit: 135ffe7870304a2957bb07753509e457427ef3a3 Parents: ef2f895 Author: Ted Ross <tr...@redhat.com> Authored: Thu Jun 1 14:39:34 2017 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Thu Jun 1 14:40:44 2017 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 4 - python/qpid_dispatch_internal/router/data.py | 27 +++-- python/qpid_dispatch_internal/router/engine.py | 24 +---- python/qpid_dispatch_internal/router/mobile.py | 75 +++++++++---- python/qpid_dispatch_internal/router/node.py | 59 ++++++++-- src/router_core/connections.c | 52 +++++---- src/router_core/route_control.c | 4 +- src/router_core/route_tables.c | 63 +++-------- src/router_core/router_core_private.h | 11 +- src/router_core/transfer.c | 113 ++++++++++++++------ src/router_pynode.c | 48 +-------- 11 files changed, 254 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 25af0e5..3bf1da8 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -75,16 +75,12 @@ void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char * void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash); void qdr_core_update_destination(qdr_core_t *core, const char *address_hash, uint32_t in_links, uint32_t out_capacity); -typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash, uint32_t in_links, uint32_t out_capacity); typedef void (*qdr_mobile_update_t) (void *context, const char *address_hash, uint32_t in_links, uint32_t out_capacity); -typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash); typedef void (*qdr_link_lost_t) (void *context, int link_maskbit); void qdr_core_route_table_handlers(qdr_core_t *core, void *context, - qdr_mobile_added_t mobile_added, qdr_mobile_update_t mobile_update, - qdr_mobile_removed_t mobile_removed, qdr_link_lost_t link_lost); /** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/data.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py index 7e66a9e..1399211 100644 --- a/python/qpid_dispatch_internal/router/data.py +++ b/python/qpid_dispatch_internal/router/data.py @@ -242,23 +242,25 @@ class MessageLSR(object): class MessageMAU(object): """ """ - def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): + def __init__(self, body, _id=None, _seq=None, _add_map=None, _del_list=None, _update_map=None, _exist_map=None): if body: self.id = getMandatory(body, 'id', str) self.version = getOptional(body, 'pv', 0, long) self.area = '0' self.mobile_seq = getMandatory(body, 'mobile_seq', long) - self.add_list = getOptional(body, 'add', None, list) + self.add_map = getOptional(body, 'add', None, dict) self.del_list = getOptional(body, 'del', None, list) - self.exist_list = getOptional(body, 'exist', None, list) + self.update_map = getOptional(body, 'upd', None, dict) + self.exist_map = getOptional(body, 'exist', None, dict) else: self.id = _id self.version = ProtocolVersion self.area = '0' self.mobile_seq = long(_seq) - self.add_list = _add_list + self.add_map = _add_map self.del_list = _del_list - self.exist_list = _exist_list + self.update_map = _update_map + self.exist_map = _exist_map def get_opcode(self): return 'MAU' @@ -266,21 +268,24 @@ class MessageMAU(object): def __repr__(self): _add = '' _del = '' + _upd = '' _exist = '' - if self.add_list != None: _add = ' add=%r' % self.add_list + if self.add_map != None: _add = ' add=%r' % self.add_map if self.del_list != None: _del = ' del=%r' % self.del_list - if self.exist_list != None: _exist = ' exist=%r' % self.exist_list - return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \ - (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist) + if self.update_map != None: _upd = ' upd=%r' % self.update_map + if self.exist_map != None: _exist = ' exist=%r' % self.exist_map + return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s%s)" % \ + (self.id, self.version, self.area, self.mobile_seq, _add, _del, _upd, _exist) def to_dict(self): body = {'id' : self.id, 'pv' : self.version, 'area' : self.area, 'mobile_seq' : self.mobile_seq } - if self.add_list != None: body['add'] = self.add_list + if self.add_map != None: body['add'] = self.add_map if self.del_list != None: body['del'] = self.del_list - if self.exist_list != None: body['exist'] = self.exist_list + if self.update_map != None: body['upd'] = self.update_map + if self.exist_map != None: body['exist'] = self.exist_map return body http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/engine.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py index c018664..9405d34 100644 --- a/python/qpid_dispatch_internal/router/engine.py +++ b/python/qpid_dispatch_internal/router/engine.py @@ -91,31 +91,17 @@ class RouterEngine: except IndexError: raise ValueError("No router configuration found") return self._config - def addressAdded(self, addr, in_links, out_capacity): - """ - """ - try: - if addr[0] in 'MCD': - self.mobile_address_engine.add_local_address(addr) - except Exception: - self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) - def addressUpdate(self, addr, in_links, out_capacity): """ """ try: - pass - except Exception: - self.log_ma(LOG_ERROR, "Exception in update-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) - - def addressRemoved(self, addr): - """ - """ - try: if addr[0] in 'MCD': - self.mobile_address_engine.del_local_address(addr) + if in_links == 0 and out_capacity == 0: + self.mobile_address_engine.del_local_address(addr) + else: + self.mobile_address_engine.update_local_address(addr, in_links, out_capacity) except Exception: - self.log_ma(LOG_ERROR, "Exception in del-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) + self.log_ma(LOG_ERROR, "Exception in update-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) def linkLost(self, link_id): """ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/mobile.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/mobile.py b/python/qpid_dispatch_internal/router/mobile.py index 2592fe2..5f523e3 100644 --- a/python/qpid_dispatch_internal/router/mobile.py +++ b/python/qpid_dispatch_internal/router/mobile.py @@ -34,9 +34,10 @@ class MobileAddressEngine(object): self.node_tracker = node_tracker self.id = self.container.id self.mobile_seq = 0 - self.local_addrs = [] - self.added_addrs = [] + self.local_addrs = {} + self.added_addrs = {} self.deleted_addrs = [] + self.updated_addrs = [] self.sent_deltas = {} @@ -45,9 +46,12 @@ class MobileAddressEngine(object): ## If local addrs have changed, collect the changes and send a MAU with the diffs ## Note: it is important that the differential-MAU be sent before a RA is sent ## - if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0: + if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0 or len(self.updated_addrs) > 0: self.mobile_seq += 1 - msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, self.deleted_addrs) + update_map = {} + for k in self.updated_addrs: + update_map[k] = self.local_addrs[k] + msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, self.deleted_addrs, update_map) self.sent_deltas[self.mobile_seq] = msg if len(self.sent_deltas) > MAX_KEPT_DELTAS: @@ -55,34 +59,65 @@ class MobileAddressEngine(object): self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg) self.container.log_ma(LOG_TRACE, "SENT: %r" % msg) - self.local_addrs.extend(self.added_addrs) + self.local_addrs.update(self.added_addrs) for addr in self.deleted_addrs: - self.local_addrs.remove(addr) - self.added_addrs = [] + self.local_addrs.pop(addr) + self.added_addrs = {} self.deleted_addrs = [] + self.updated_addrs = [] return self.mobile_seq - def add_local_address(self, addr): + def update_local_address(self, addr, local_in_links, local_out_capacity): """ """ - if self.local_addrs.count(addr) == 0: - if self.added_addrs.count(addr) == 0: - self.added_addrs.append(addr) + value = (local_in_links, local_out_capacity) + if addr not in self.local_addrs: + ## + ## If the address is not present, schedule it to be added. If it was + ## already scheduled, update the scheduled value. + ## + self.added_addrs[addr] = value else: + ## + ## If the address is already present + ## if self.deleted_addrs.count(addr) > 0: + ## + ## If it was scehduled to be deleted, cancel the deletion. + ## self.deleted_addrs.remove(addr) + ## + ## Update the address value and schedule a protocol update + ## + self.local_addrs[addr] = value + if self.updated_addrs.count(addr) == 0: + self.updated_addrs.append(addr) + def del_local_address(self, addr): """ """ - if self.local_addrs.count(addr) > 0: + if addr in self.local_addrs: + ## + ## If the address is present, schedule its deletion + ## if self.deleted_addrs.count(addr) == 0: self.deleted_addrs.append(addr) + + ## + ## If there is an update scheduled for this address, remove the update + ## + if self.updated_addrs.count(addr) > 0: + self.updated_addrs.remove(addr) else: - if self.added_addrs.count(addr) > 0: - self.added_addrs.remove(addr) + if addr in self.added_addrs: + ## + ## If the address is not present and it has been scheduled to + ## be added, unschedule the addition. + ## + self.added_addrs.pop(addr) def handle_mau(self, msg, now): @@ -96,14 +131,14 @@ class MobileAddressEngine(object): return node = self.node_tracker.router_node(msg.id) - if msg.exist_list != None: + if msg.exist_map != None: ## ## Absolute MAU ## if msg.mobile_seq == node.mobile_address_sequence: return node.mobile_address_sequence = msg.mobile_seq - node.overwrite_addresses(msg.exist_list) + node.overwrite_addresses(msg.exist_map) else: ## ## Differential MAU @@ -113,8 +148,10 @@ class MobileAddressEngine(object): ## This message represents the next expected sequence, incorporate the deltas ## node.mobile_address_sequence += 1 - for a in msg.add_list: - node.map_address(a) + for a,v in msg.add_map.items(): + node.map_address(a, v) + for a,v in msg.update_map.items(): + node.update_address(a, v) for a in msg.del_list: node.unmap_address(a) @@ -149,7 +186,7 @@ class MobileAddressEngine(object): ## ## The peer needs to be sent an absolute update with the whole address list ## - smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, self.local_addrs) + smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, None, self.local_addrs) self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg) self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/node.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py index 448e684..ca660cc 100644 --- a/python/qpid_dispatch_internal/router/node.py +++ b/python/qpid_dispatch_internal/router/node.py @@ -353,6 +353,16 @@ class NodeTracker(object): return None + def address_values(self, addr): + values = [] + for nid,node in self.nodes.items(): + if addr in node.mobile_addresses: + values.append(node.mobile_addresses[addr]) + if len(values) == 0: + return (0, 0) + return tuple([sum(x) for x in zip(*values)]) + + def _allocate_maskbit(self): if self.next_maskbit == None: raise Exception("Exceeded Maximum Router Count") @@ -392,7 +402,7 @@ class RouterNode(object): self.next_hop_router = None self.cost = None self.valid_origins = None - self.mobile_addresses = [] + self.mobile_addresses = {} self.mobile_address_sequence = 0 self.need_ls_request = True self.need_mobile_request = False @@ -519,35 +529,62 @@ class RouterNode(object): return False - def map_address(self, addr): - self.mobile_addresses.append(addr) + def map_address(self, addr, values): + self.mobile_addresses[addr] = values self.adapter.map_destination(addr, self.maskbit) + values = self.parent.address_values(addr) + self.adapter.update_destination(addr, values[0], values[1]) self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id)) + def update_address(self, addr, values): + self.mobile_addresses[addr] = values + values = self.parent.address_values(addr) + self.adapter.update_destination(addr, values[0], values[1]) + + def unmap_address(self, addr): - self.mobile_addresses.remove(addr) + self.mobile_addresses.pop(addr) self.adapter.unmap_destination(addr, self.maskbit) + values = self.parent.address_values(addr) + self.adapter.update_destination(addr, values[0], values[1]) self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % (self._logify(addr), self.id)) def unmap_all_addresses(self): self.mobile_address_sequence = 0 - while self.mobile_addresses: - self.unmap_address(self.mobile_addresses[0]) + alist = self.mobile_addresses.keys() + for a in alist: + self.unmap_address(a) def overwrite_addresses(self, addrs): - added = [] + added = {} deleted = [] - for a in addrs: + for a,v in addrs.items(): if a not in self.mobile_addresses: - added.append(a) + added[a] = v + addrs.pop(a) for a in self.mobile_addresses: if a not in addrs: deleted.append(a) - for a in added: - self.map_address(a) + + ## + ## Map newly added addresses to this node + ## + for a,v in added.items(): + self.map_address(a, v) + + ## + ## Update addresses whose values have changed + ## + for a,v in addrs.items(): + if self.mobile_addresses[a] != v: + self.update_address(a, v) + + ## + ## Unmap addresses that were not found in the new set + ## for a in deleted: self.unmap_address(a) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 0c8166f..c690e8f 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -944,12 +944,8 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) // if (was_local) { const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - if (key && *key == 'M') { - if (DEQ_SIZE(addr->rlinks) == 0) - qdr_post_mobile_removed_CT(core, key); - else - qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity); - } + if (key && *key == 'M') + qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity); } // @@ -1257,24 +1253,19 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo } -static void qdr_outgoing_link_added_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) +static void qdr_link_change_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) { // // Inform the router module of the change in address state // const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - if (key && *key == 'M') { - if (DEQ_SIZE(addr->rlinks) == 1) - qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity); - else - qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity); - } + if (key && *key == 'M') + qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity); // - // If this is the first outgoing link on the address, kick off any blocked incoming links + // Recalculate the flow-control data and issue credit as needed. // - if (DEQ_SIZE(addr->rlinks) == 1) - qdr_addr_start_inlinks_CT(core, addr); + qdr_addr_visit_inlinks_CT(core, addr); } @@ -1366,6 +1357,11 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_link_outbound_second_attach_CT(core, link, source, target); // + // Do the side effects of this link change. + // + qdr_link_change_CT(core, addr, link); + + // // Issue the initial credit only if there are destinations for the address. // if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)) @@ -1427,9 +1423,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act addr->local_out_capacity += link->capacity; // - // Do all the action that is needed when an outgoing link is established + // Do the side effects of this link change. // - qdr_outgoing_link_added_CT(core, addr, link); + qdr_link_change_CT(core, addr, link); qdr_link_outbound_second_attach_CT(core, link, source, target); } @@ -1494,7 +1490,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac // Issue credit if this is an anonymous link or if its address has at least one reachable destination. // qdr_address_t *addr = link->owning_addr; - if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))) + + // + // Do the side effects of this link change. + // + if (addr) + qdr_link_change_CT(core, addr, link); + else qdr_link_issue_credit_CT(core, link, link->capacity, false); break; @@ -1525,9 +1527,9 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac link->owning_addr->local_out_capacity += link->capacity; // - // Do all the action that is needed when an outgoing link is established + // Do the side effects of this link change. // - qdr_outgoing_link_added_CT(core, link->owning_addr, link); + qdr_link_change_CT(core, link->owning_addr, link); } } break; @@ -1663,8 +1665,14 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b // If there was an address associated with this link, check to see if any address-related // cleanup has to be done. // - if (addr) + if (addr) { + // + // Do the side effects of this link change. + // + qdr_link_change_CT(core, addr, link); + qdr_check_addr_CT(core, addr, was_local); + } if (error) qdr_error_free(error); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index c91fa09..6482b22 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -128,7 +128,7 @@ static void qdr_link_route_activate_CT(qdr_core_t *core, qdr_link_route_t *lr, q if (DEQ_SIZE(lr->addr->conns) == 1) { key = (const char*) qd_hash_key_by_handle(lr->addr->hash_handle); if (key) - qdr_post_mobile_added_CT(core, key, 0, 0); + qdr_post_mobile_update_CT(core, key, 1, 1); } } @@ -150,7 +150,7 @@ static void qdr_link_route_deactivate_CT(qdr_core_t *core, qdr_link_route_t *lr, if (DEQ_IS_EMPTY(lr->addr->conns)) { key = (const char*) qd_hash_key_by_handle(lr->addr->hash_handle); if (key) - qdr_post_mobile_removed_CT(core, key); + qdr_post_mobile_update_CT(core, key, 0, 0); } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index e8f955e..4cec37c 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -138,15 +138,11 @@ void qdr_core_update_destination(qdr_core_t *core, const char *address_hash, uin void qdr_core_route_table_handlers(qdr_core_t *core, void *context, - qdr_mobile_added_t mobile_added, qdr_mobile_update_t mobile_update, - qdr_mobile_removed_t mobile_removed, qdr_link_lost_t link_lost) { core->rt_context = context; - core->rt_mobile_added = mobile_added; core->rt_mobile_update = mobile_update; - core->rt_mobile_removed = mobile_removed; core->rt_link_lost = link_lost; } @@ -586,7 +582,7 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool qd_bitmask_set_bit(addr->rnodes, router_maskbit); rnode->ref_count++; addr->cost_epoch--; - qdr_addr_start_inlinks_CT(core, addr); + qdr_addr_visit_inlinks_CT(core, addr); } while (false); qdr_field_free(address); @@ -643,8 +639,8 @@ static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, boo static void qdr_update_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { qdr_field_t *address = action->args.route_table.address; - //uint32_t in_links = action->args.route_table.in_links; - //uint32_t out_capacity = action->args.route_table.out_capacity; + uint32_t in_links = action->args.route_table.in_links; + uint32_t out_capacity = action->args.route_table.out_capacity; if (discard) { qdr_field_free(address); @@ -662,8 +658,15 @@ static void qdr_update_destination_CT(qdr_core_t *core, qdr_action_t *action, bo } // - // TODO - Process flow control update actions + // Update the remote flow control state. // + addr->remote_inlinks = in_links; + addr->remote_out_capacity = out_capacity; + + // + // Visit inlinks and issue/revoke credit based on the new fow control state. + // + qdr_addr_visit_inlinks_CT(core, addr); } while (false); qdr_field_free(address); @@ -700,7 +703,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discar sub->addr = addr; DEQ_ITEM_INIT(sub); DEQ_INSERT_TAIL(addr->subscriptions, sub); - qdr_addr_start_inlinks_CT(core, addr); + qdr_addr_visit_inlinks_CT(core, addr); } else free(sub); @@ -726,18 +729,6 @@ static void qdr_unsubscribe_CT(qdr_core_t *core, qdr_action_t *action, bool disc // Call-back Functions //================================================================================== -static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work) -{ - char *address_hash = qdr_field_copy(work->field); - if (address_hash) { - core->rt_mobile_added(core->rt_context, address_hash, work->in_links, work->out_capacity); - free(address_hash); - } - - qdr_field_free(work->field); -} - - static void qdr_do_mobile_update(qdr_core_t *core, qdr_general_work_t *work) { char *address_hash = qdr_field_copy(work->field); @@ -750,34 +741,12 @@ static void qdr_do_mobile_update(qdr_core_t *core, qdr_general_work_t *work) } -static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work) -{ - char *address_hash = qdr_field_copy(work->field); - if (address_hash) { - core->rt_mobile_removed(core->rt_context, address_hash); - free(address_hash); - } - - qdr_field_free(work->field); -} - - static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work) { core->rt_link_lost(core->rt_context, work->maskbit); } -void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, uint32_t in_links, uint32_t out_capacity) -{ - qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added); - work->field = qdr_field(address_hash); - work->in_links = in_links; - work->out_capacity = out_capacity; - qdr_post_general_work_CT(core, work); -} - - void qdr_post_mobile_update_CT(qdr_core_t *core, const char *address_hash, uint32_t in_links, uint32_t out_capacity) { qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_update); @@ -788,14 +757,6 @@ void qdr_post_mobile_update_CT(qdr_core_t *core, const char *address_hash, uint3 } -void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash) -{ - qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed); - work->field = qdr_field(address_hash); - qdr_post_general_work_CT(core, work); -} - - void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit) { qdr_general_work_t *work = qdr_general_work(qdr_do_link_lost); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 893dd6e..03a3e30 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -380,8 +380,9 @@ struct qdr_link_t { qdr_link_oper_status_t oper_status; bool strip_annotations_in; bool strip_annotations_out; - int capacity; - bool flow_started; ///< for incoming, true iff initial credit has been granted + uint32_t capacity; ///< Configured capacity + uint32_t credit_window; ///< Currently effective credit amount + uint32_t credit_deficit; ///< Number of credit-replenishes to skip to reduce window bool drain_mode; int credit_to_core; ///< Number of the available credits incrementally given to the core @@ -640,9 +641,7 @@ struct qdr_core_t { // Route table section // void *rt_context; - qdr_mobile_added_t rt_mobile_added; qdr_mobile_update_t rt_mobile_update; - qdr_mobile_removed_t rt_mobile_removed; qdr_link_lost_t rt_link_lost; // @@ -700,7 +699,7 @@ void qdr_forwarder_setup_CT(qdr_core_t *core); qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain); -void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); +void qdr_addr_visit_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); @@ -708,9 +707,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); -void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, uint32_t in_links, uint32_t out_capacity); void qdr_post_mobile_update_CT(qdr_core_t *core, const char *address_hash, uint32_t in_links, uint32_t out_capacity); -void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash); void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 40d4c45..9cf338a 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -756,15 +756,26 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo { assert(link->link_direction == QD_INCOMING); + // + // If there is a credit deficit (i.e. the credit window shrank), then reduce the + // credit by up to the deficit. + // + if (link->credit_deficit > 0) { + if (link->credit_deficit > credit) { + link->credit_deficit -= credit; + credit = 0; + } else { + credit -= link->credit_deficit; + link->credit_deficit = 0; + } + } + bool drain_changed = link->drain_mode |= drain; link->drain_mode = drain; if (!drain_changed && credit == 0) return; - if (credit > 0) - link->flow_started = true; - qdr_link_work_t *work = new_qdr_link_work_t(); ZERO(work); @@ -778,56 +789,92 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo } +static void qdr_calculate_target_credit_CT(qdr_address_t *addr) +{ + uint32_t total_in_links = DEQ_SIZE(addr->inlinks) + addr->remote_inlinks; + uint32_t total_out_capacity = addr->local_out_capacity + addr->remote_out_capacity; + + if (total_out_capacity == 0) + addr->target_in_credit = 0; + else if (total_out_capacity <= total_in_links) + addr->target_in_credit = 1; + else + addr->target_in_credit = total_out_capacity / (total_in_links ? total_in_links : 1); +} + + /** - * This function should be called after adding a new destination (subscription, local link, - * or remote node) to an address. If this address now has exactly one destination (i.e. it - * transitioned from unreachable to reachable), make sure any unstarted in-links are issued - * initial credit. + * This function should be called whenever the target credit for the address is changed. + * Each inlink for the address will have its credit adjusted. * * Also, check the inlinks to see if there are undelivered messages. If so, drain them to * the forwarder. */ -void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) +void qdr_addr_visit_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) { // - // If there aren't any inlinks, there's no point in proceeding. + // Calculate the new target credit for the address. // - if (DEQ_SIZE(addr->inlinks) == 0) - return; + qdr_calculate_target_credit_CT(addr); - if (qdr_addr_path_count_CT(addr) == 1) { - qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); - while (ref) { - qdr_link_t *link = ref->link; + qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); + while (ref) { + qdr_link_t *link = ref->link; + bool from_zero = link->credit_window == 0; + if (DEQ_SIZE(addr->subscriptions) > 0) { // - // Issue credit to stalled links + // If the address has at least one in-process subscriber and the + // credit window is zero (first time through), simply issue the + // link capacity. This address will not follow the credit heuristic. // - if (!link->flow_started) + if (from_zero) { + link->credit_window = link->capacity; qdr_link_issue_credit_CT(core, link, link->capacity, false); - + } + } else { // - // Drain undelivered deliveries via the forwarder + // Target credit is not to exceed the link capacity. // - if (DEQ_SIZE(link->undelivered) > 0) { + uint32_t target_credit = (addr->target_in_credit < link->capacity) ? addr->target_in_credit : link->capacity; + int32_t diff = (int32_t) target_credit - (int32_t) link->credit_window; + + link->credit_window = target_credit; + if (diff > 0) { // - // Move all the undelivered to a local list in case not all can be delivered. - // We don't want to loop here forever putting the same messages on the undelivered - // list. + // We increased the window for this link. Issue the additional credit. // - qdr_delivery_list_t deliveries; - DEQ_MOVE(link->undelivered, deliveries); - - qdr_delivery_t *dlv = DEQ_HEAD(deliveries); - while (dlv) { - DEQ_REMOVE_HEAD(deliveries); - qdr_link_forward_CT(core, link, dlv, addr); - dlv = DEQ_HEAD(deliveries); - } + qdr_link_issue_credit_CT(core, link, diff, false); + } else { + // + // We decreased (or didn't change) the window. Record the deficit so we + // can withhold replenished credit. + // + link->credit_deficit = (uint32_t) (0 - diff); } + } - ref = DEQ_NEXT(ref); + // + // Drain undelivered deliveries via the forwarder after unblocking a zero-credit link. + // + if (from_zero && DEQ_SIZE(link->undelivered) > 0) { + // + // Move all the undelivered to a local list in case not all can be delivered. + // We don't want to loop here forever putting the same messages on the undelivered + // list. + // + qdr_delivery_list_t deliveries; + DEQ_MOVE(link->undelivered, deliveries); + + qdr_delivery_t *dlv = DEQ_HEAD(deliveries); + while (dlv) { + DEQ_REMOVE_HEAD(deliveries); + qdr_link_forward_CT(core, link, dlv, addr); + dlv = DEQ_HEAD(deliveries); + } } + + ref = DEQ_NEXT(ref); } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_pynode.c ---------------------------------------------------------------------- diff --git a/src/router_pynode.c b/src/router_pynode.c index 0529f8b..100acec 100644 --- a/src/router_pynode.c +++ b/src/router_pynode.c @@ -30,9 +30,7 @@ static qd_log_source_t *log_source = 0; static PyObject *pyRouter = 0; static PyObject *pyTick = 0; -static PyObject *pyAdded = 0; static PyObject *pyUpdate = 0; -static PyObject *pyRemoved = 0; static PyObject *pyLinkLost = 0; typedef struct { @@ -360,27 +358,6 @@ static PyTypeObject RouterAdapterType = { }; -static void qd_router_mobile_added(void *context, const char *address_hash, uint32_t in_links, uint32_t out_capacity) -{ - qd_router_t *router = (qd_router_t*) context; - PyObject *pArgs; - PyObject *pValue; - - if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) { - qd_python_lock_state_t lock_state = qd_python_lock(); - pArgs = PyTuple_New(3); - PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash)); - PyTuple_SetItem(pArgs, 1, PyInt_FromLong((long) in_links)); - PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) out_capacity)); - pValue = PyObject_CallObject(pyAdded, pArgs); - qd_error_py(); - Py_DECREF(pArgs); - Py_XDECREF(pValue); - qd_python_unlock(lock_state); - } -} - - static void qd_router_mobile_update(void *context, const char *address_hash, uint32_t in_links, uint32_t out_capacity) { qd_router_t *router = (qd_router_t*) context; @@ -402,32 +379,13 @@ static void qd_router_mobile_update(void *context, const char *address_hash, uin } -static void qd_router_mobile_removed(void *context, const char *address_hash) -{ - qd_router_t *router = (qd_router_t*) context; - PyObject *pArgs; - PyObject *pValue; - - if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) { - qd_python_lock_state_t lock_state = qd_python_lock(); - pArgs = PyTuple_New(1); - PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash)); - pValue = PyObject_CallObject(pyRemoved, pArgs); - qd_error_py(); - Py_DECREF(pArgs); - Py_XDECREF(pValue); - qd_python_unlock(lock_state); - } -} - - static void qd_router_link_lost(void *context, int link_mask_bit) { qd_router_t *router = (qd_router_t*) context; PyObject *pArgs; PyObject *pValue; - if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) { + if (pyLinkLost && router->router_mode == QD_ROUTER_MODE_INTERIOR) { qd_python_lock_state_t lock_state = qd_python_lock(); pArgs = PyTuple_New(1); PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit)); @@ -447,9 +405,7 @@ qd_error_t qd_router_python_setup(qd_router_t *router) qdr_core_route_table_handlers(router->router_core, router, - qd_router_mobile_added, qd_router_mobile_update, - qd_router_mobile_removed, qd_router_link_lost); // @@ -517,9 +473,7 @@ qd_error_t qd_router_python_setup(qd_router_t *router) QD_ERROR_PY_RET(); pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); QD_ERROR_PY_RET(); - pyAdded = PyObject_GetAttrString(pyRouter, "addressAdded"); QD_ERROR_PY_RET(); pyUpdate = PyObject_GetAttrString(pyRouter, "addressUpdate"); QD_ERROR_PY_RET(); - pyRemoved = PyObject_GetAttrString(pyRouter, "addressRemoved"); QD_ERROR_PY_RET(); pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET(); return qd_error_code(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org