Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-dispatch-781-1 [created] 85a89965a


DISPATCH-781 - Completed the handling of credit for flow control


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/135ffe78
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/135ffe78
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/135ffe78

Branch: refs/heads/tross-dispatch-781-1
Commit: 135ffe7870304a2957bb07753509e457427ef3a3
Parents: ef2f895
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jun 1 14:39:34 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jun 1 14:40:44 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h            |   4 -
 python/qpid_dispatch_internal/router/data.py   |  27 +++--
 python/qpid_dispatch_internal/router/engine.py |  24 +----
 python/qpid_dispatch_internal/router/mobile.py |  75 +++++++++----
 python/qpid_dispatch_internal/router/node.py   |  59 ++++++++--
 src/router_core/connections.c                  |  52 +++++----
 src/router_core/route_control.c                |   4 +-
 src/router_core/route_tables.c                 |  63 +++--------
 src/router_core/router_core_private.h          |  11 +-
 src/router_core/transfer.c                     | 113 ++++++++++++++------
 src/router_pynode.c                            |  48 +--------
 11 files changed, 254 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 25af0e5..3bf1da8 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -75,16 +75,12 @@ void qdr_core_map_destination(qdr_core_t *core, int 
router_maskbit, const char *
 void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const 
char *address_hash);
 void qdr_core_update_destination(qdr_core_t *core, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity);
 
-typedef void (*qdr_mobile_added_t)   (void *context, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity);
 typedef void (*qdr_mobile_update_t)  (void *context, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity);
-typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
 typedef void (*qdr_link_lost_t)      (void *context, int link_maskbit);
 
 void qdr_core_route_table_handlers(qdr_core_t           *core, 
                                    void                 *context,
-                                   qdr_mobile_added_t    mobile_added,
                                    qdr_mobile_update_t   mobile_update,
-                                   qdr_mobile_removed_t  mobile_removed,
                                    qdr_link_lost_t       link_lost);
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py 
b/python/qpid_dispatch_internal/router/data.py
index 7e66a9e..1399211 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -242,23 +242,25 @@ class MessageLSR(object):
 class MessageMAU(object):
     """
     """
-    def __init__(self, body, _id=None, _seq=None, _add_list=None, 
_del_list=None, _exist_list=None):
+    def __init__(self, body, _id=None, _seq=None, _add_map=None, 
_del_list=None, _update_map=None, _exist_map=None):
         if body:
             self.id = getMandatory(body, 'id', str)
             self.version = getOptional(body, 'pv', 0, long)
             self.area = '0'
             self.mobile_seq = getMandatory(body, 'mobile_seq', long)
-            self.add_list = getOptional(body, 'add', None, list)
+            self.add_map = getOptional(body, 'add', None, dict)
             self.del_list = getOptional(body, 'del', None, list)
-            self.exist_list = getOptional(body, 'exist', None, list)
+            self.update_map = getOptional(body, 'upd', None, dict)
+            self.exist_map = getOptional(body, 'exist', None, dict)
         else:
             self.id = _id
             self.version = ProtocolVersion
             self.area = '0'
             self.mobile_seq = long(_seq)
-            self.add_list = _add_list
+            self.add_map = _add_map
             self.del_list = _del_list
-            self.exist_list = _exist_list
+            self.update_map = _update_map
+            self.exist_map = _exist_map
 
     def get_opcode(self):
         return 'MAU'
@@ -266,21 +268,24 @@ class MessageMAU(object):
     def __repr__(self):
         _add = ''
         _del = ''
+        _upd = ''
         _exist = ''
-        if self.add_list != None:   _add   = ' add=%r'   % self.add_list
+        if self.add_map != None:    _add   = ' add=%r'   % self.add_map
         if self.del_list != None:   _del   = ' del=%r'   % self.del_list
-        if self.exist_list != None: _exist = ' exist=%r' % self.exist_list
-        return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \
-                (self.id, self.version, self.area, self.mobile_seq, _add, 
_del, _exist)
+        if self.update_map != None: _upd   = ' upd=%r'   % self.update_map
+        if self.exist_map != None:  _exist = ' exist=%r' % self.exist_map
+        return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s%s)" % \
+                (self.id, self.version, self.area, self.mobile_seq, _add, 
_del, _upd, _exist)
 
     def to_dict(self):
         body = {'id'         : self.id,
                 'pv'         : self.version,
                 'area'       : self.area,
                 'mobile_seq' : self.mobile_seq }
-        if self.add_list != None:   body['add']   = self.add_list
+        if self.add_map != None:    body['add']   = self.add_map
         if self.del_list != None:   body['del']   = self.del_list
-        if self.exist_list != None: body['exist'] = self.exist_list
+        if self.update_map != None: body['upd']   = self.update_map
+        if self.exist_map != None:  body['exist'] = self.exist_map
         return body
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py 
b/python/qpid_dispatch_internal/router/engine.py
index c018664..9405d34 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -91,31 +91,17 @@ class RouterEngine:
             except IndexError: raise ValueError("No router configuration 
found")
         return self._config
 
-    def addressAdded(self, addr, in_links, out_capacity):
-        """
-        """
-        try:
-            if addr[0] in 'MCD':
-                self.mobile_address_engine.add_local_address(addr)
-        except Exception:
-            self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % 
format_exc(LOG_STACK_LIMIT))
-
     def addressUpdate(self, addr, in_links, out_capacity):
         """
         """
         try:
-            pass
-        except Exception:
-            self.log_ma(LOG_ERROR, "Exception in update-address 
processing\n%s" % format_exc(LOG_STACK_LIMIT))
-
-    def addressRemoved(self, addr):
-        """
-        """
-        try:
             if addr[0] in 'MCD':
-                self.mobile_address_engine.del_local_address(addr)
+                if in_links == 0 and out_capacity == 0:
+                    self.mobile_address_engine.del_local_address(addr)
+                else:
+                    self.mobile_address_engine.update_local_address(addr, 
in_links, out_capacity)
         except Exception:
-            self.log_ma(LOG_ERROR, "Exception in del-address processing\n%s" % 
format_exc(LOG_STACK_LIMIT))
+            self.log_ma(LOG_ERROR, "Exception in update-address 
processing\n%s" % format_exc(LOG_STACK_LIMIT))
 
     def linkLost(self, link_id):
         """

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/mobile.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/mobile.py 
b/python/qpid_dispatch_internal/router/mobile.py
index 2592fe2..5f523e3 100644
--- a/python/qpid_dispatch_internal/router/mobile.py
+++ b/python/qpid_dispatch_internal/router/mobile.py
@@ -34,9 +34,10 @@ class MobileAddressEngine(object):
         self.node_tracker  = node_tracker
         self.id            = self.container.id
         self.mobile_seq    = 0
-        self.local_addrs   = []
-        self.added_addrs   = []
+        self.local_addrs   = {}
+        self.added_addrs   = {}
         self.deleted_addrs = []
+        self.updated_addrs = []
         self.sent_deltas   = {}
 
 
@@ -45,9 +46,12 @@ class MobileAddressEngine(object):
         ## If local addrs have changed, collect the changes and send a MAU 
with the diffs
         ## Note: it is important that the differential-MAU be sent before a RA 
is sent
         ##
-        if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
+        if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0 or 
len(self.updated_addrs) > 0:
             self.mobile_seq += 1
-            msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, 
self.deleted_addrs)
+            update_map = {}
+            for k in self.updated_addrs:
+                update_map[k] = self.local_addrs[k]
+            msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, 
self.deleted_addrs, update_map)
 
             self.sent_deltas[self.mobile_seq] = msg
             if len(self.sent_deltas) > MAX_KEPT_DELTAS:
@@ -55,34 +59,65 @@ class MobileAddressEngine(object):
 
             self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg)
             self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
-            self.local_addrs.extend(self.added_addrs)
+            self.local_addrs.update(self.added_addrs)
             for addr in self.deleted_addrs:
-                self.local_addrs.remove(addr)
-            self.added_addrs   = []
+                self.local_addrs.pop(addr)
+            self.added_addrs   = {}
             self.deleted_addrs = []
+            self.updated_addrs = []
         return self.mobile_seq
 
 
-    def add_local_address(self, addr):
+    def update_local_address(self, addr, local_in_links, local_out_capacity):
         """
         """
-        if self.local_addrs.count(addr) == 0:
-            if self.added_addrs.count(addr) == 0:
-                self.added_addrs.append(addr)
+        value = (local_in_links, local_out_capacity)
+        if addr not in self.local_addrs:
+            ##
+            ## If the address is not present, schedule it to be added.  If it 
was
+            ## already scheduled, update the scheduled value.
+            ##
+            self.added_addrs[addr] = value
         else:
+            ##
+            ## If the address is already present
+            ##
             if self.deleted_addrs.count(addr) > 0:
+                ##
+                ## If it was scehduled to be deleted, cancel the deletion.
+                ##
                 self.deleted_addrs.remove(addr)
 
+            ##
+            ## Update the address value and schedule a protocol update
+            ##
+            self.local_addrs[addr] = value
+            if self.updated_addrs.count(addr) == 0:
+                self.updated_addrs.append(addr)
+
 
     def del_local_address(self, addr):
         """
         """
-        if self.local_addrs.count(addr) > 0:
+        if addr in self.local_addrs:
+            ##
+            ## If the address is present, schedule its deletion
+            ##
             if self.deleted_addrs.count(addr) == 0:
                 self.deleted_addrs.append(addr)
+
+            ##
+            ## If there is an update scheduled for this address, remove the 
update
+            ##
+            if self.updated_addrs.count(addr) > 0:
+                self.updated_addrs.remove(addr)
         else:
-            if self.added_addrs.count(addr) > 0:
-                self.added_addrs.remove(addr)
+            if addr in self.added_addrs:
+                ##
+                ## If the address is not present and it has been scheduled to
+                ## be added, unschedule the addition.
+                ##
+                self.added_addrs.pop(addr)
 
 
     def handle_mau(self, msg, now):
@@ -96,14 +131,14 @@ class MobileAddressEngine(object):
             return
         node = self.node_tracker.router_node(msg.id)
 
-        if msg.exist_list != None:
+        if msg.exist_map != None:
             ##
             ## Absolute MAU
             ##
             if msg.mobile_seq == node.mobile_address_sequence:
                 return
             node.mobile_address_sequence = msg.mobile_seq
-            node.overwrite_addresses(msg.exist_list)
+            node.overwrite_addresses(msg.exist_map)
         else:
             ##
             ## Differential MAU
@@ -113,8 +148,10 @@ class MobileAddressEngine(object):
                 ## This message represents the next expected sequence, 
incorporate the deltas
                 ##
                 node.mobile_address_sequence += 1
-                for a in msg.add_list:
-                    node.map_address(a)
+                for a,v in msg.add_map.items():
+                    node.map_address(a, v)
+                for a,v in msg.update_map.items():
+                    node.update_address(a, v)
                 for a in msg.del_list:
                     node.unmap_address(a)
 
@@ -149,7 +186,7 @@ class MobileAddressEngine(object):
         ##
         ## The peer needs to be sent an absolute update with the whole address 
list
         ##
-        smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, 
self.local_addrs)
+        smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, None, 
self.local_addrs)
         self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg)
         self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg)
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py 
b/python/qpid_dispatch_internal/router/node.py
index 448e684..ca660cc 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -353,6 +353,16 @@ class NodeTracker(object):
         return None
 
 
+    def address_values(self, addr):
+        values = []
+        for nid,node in self.nodes.items():
+            if addr in node.mobile_addresses:
+                values.append(node.mobile_addresses[addr])
+        if len(values) == 0:
+            return (0, 0)
+        return tuple([sum(x) for x in zip(*values)])
+
+
     def _allocate_maskbit(self):
         if self.next_maskbit == None:
             raise Exception("Exceeded Maximum Router Count")
@@ -392,7 +402,7 @@ class RouterNode(object):
         self.next_hop_router         = None
         self.cost                    = None
         self.valid_origins           = None
-        self.mobile_addresses        = []
+        self.mobile_addresses        = {}
         self.mobile_address_sequence = 0
         self.need_ls_request         = True
         self.need_mobile_request     = False
@@ -519,35 +529,62 @@ class RouterNode(object):
         return False
 
 
-    def map_address(self, addr):
-        self.mobile_addresses.append(addr)
+    def map_address(self, addr, values):
+        self.mobile_addresses[addr] = values
         self.adapter.map_destination(addr, self.maskbit)
+        values = self.parent.address_values(addr)
+        self.adapter.update_destination(addr, values[0], values[1])
         self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % 
(self._logify(addr), self.id))
 
 
+    def update_address(self, addr, values):
+        self.mobile_addresses[addr] = values
+        values = self.parent.address_values(addr)
+        self.adapter.update_destination(addr, values[0], values[1])
+
+
     def unmap_address(self, addr):
-        self.mobile_addresses.remove(addr)
+        self.mobile_addresses.pop(addr)
         self.adapter.unmap_destination(addr, self.maskbit)
+        values = self.parent.address_values(addr)
+        self.adapter.update_destination(addr, values[0], values[1])
         self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % 
(self._logify(addr), self.id))
 
 
     def unmap_all_addresses(self):
         self.mobile_address_sequence = 0
-        while self.mobile_addresses:
-            self.unmap_address(self.mobile_addresses[0])
+        alist = self.mobile_addresses.keys()
+        for a in alist:
+            self.unmap_address(a)
 
 
     def overwrite_addresses(self, addrs):
-        added   = []
+        added   = {}
         deleted = []
-        for a in addrs:
+        for a,v in addrs.items():
             if a not in self.mobile_addresses:
-                added.append(a)
+                added[a] = v
+                addrs.pop(a)
         for a in self.mobile_addresses:
             if a not in addrs:
                 deleted.append(a)
-        for a in added:
-            self.map_address(a)
+
+        ##
+        ## Map newly added addresses to this node
+        ##
+        for a,v in added.items():
+            self.map_address(a, v)
+
+        ##
+        ## Update addresses whose values have changed
+        ##
+        for a,v in addrs.items():
+            if self.mobile_addresses[a] != v:
+                self.update_address(a, v)
+
+        ##
+        ## Unmap addresses that were not found in the new set
+        ##
         for a in deleted:
             self.unmap_address(a)
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0c8166f..c690e8f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -944,12 +944,8 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t 
*addr, bool was_local)
     //
     if (was_local) {
         const char *key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
-        if (key && *key == 'M') {
-            if (DEQ_SIZE(addr->rlinks) == 0)
-                qdr_post_mobile_removed_CT(core, key);
-            else
-                qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), 
addr->local_out_capacity);
-        }
+        if (key && *key == 'M')
+            qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), 
addr->local_out_capacity);
     }
 
     //
@@ -1257,24 +1253,19 @@ static void qdr_connection_closed_CT(qdr_core_t *core, 
qdr_action_t *action, boo
 }
 
 
-static void qdr_outgoing_link_added_CT(qdr_core_t *core, qdr_address_t *addr, 
qdr_link_t *link)
+static void qdr_link_change_CT(qdr_core_t *core, qdr_address_t *addr, 
qdr_link_t *link)
 {
     //
     // Inform the router module of the change in address state
     //
     const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
-    if (key && *key == 'M') {
-        if (DEQ_SIZE(addr->rlinks) == 1)
-            qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), 
addr->local_out_capacity);
-        else
-            qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), 
addr->local_out_capacity);
-    }
+    if (key && *key == 'M')
+        qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), 
addr->local_out_capacity);
 
     //
-    // If this is the first outgoing link on the address, kick off any blocked 
incoming links
+    // Recalculate the flow-control data and issue credit as needed.
     //
-    if (DEQ_SIZE(addr->rlinks) == 1)
-        qdr_addr_start_inlinks_CT(core, addr);
+    qdr_addr_visit_inlinks_CT(core, addr);
 }
 
 
@@ -1366,6 +1357,11 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
                     qdr_link_outbound_second_attach_CT(core, link, source, 
target);
 
                     //
+                    // Do the side effects of this link change.
+                    //
+                    qdr_link_change_CT(core, addr, link);
+
+                    //
                     // Issue the initial credit only if there are destinations 
for the address.
                     //
                     if (DEQ_SIZE(addr->subscriptions) || 
DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))
@@ -1427,9 +1423,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
                 addr->local_out_capacity += link->capacity;
 
                 //
-                // Do all the action that is needed when an outgoing link is 
established
+                // Do the side effects of this link change.
                 //
-                qdr_outgoing_link_added_CT(core, addr, link);
+                qdr_link_change_CT(core, addr, link);
 
                 qdr_link_outbound_second_attach_CT(core, link, source, target);
             }
@@ -1494,7 +1490,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t 
*core, qdr_action_t *ac
             // Issue credit if this is an anonymous link or if its address has 
at least one reachable destination.
             //
             qdr_address_t *addr = link->owning_addr;
-            if (!addr || (DEQ_SIZE(addr->subscriptions) || 
DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)))
+
+            //
+            // Do the side effects of this link change.
+            //
+            if (addr)
+                qdr_link_change_CT(core, addr, link);
+            else
                 qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
@@ -1525,9 +1527,9 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t 
*core, qdr_action_t *ac
                     link->owning_addr->local_out_capacity += link->capacity;
 
                     //
-                    // Do all the action that is needed when an outgoing link 
is established
+                    // Do the side effects of this link change.
                     //
-                    qdr_outgoing_link_added_CT(core, link->owning_addr, link);
+                    qdr_link_change_CT(core, link->owning_addr, link);
                 }
             }
             break;
@@ -1663,8 +1665,14 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, 
qdr_action_t *action, b
     // If there was an address associated with this link, check to see if any 
address-related
     // cleanup has to be done.
     //
-    if (addr)
+    if (addr) {
+        //
+        // Do the side effects of this link change.
+        //
+        qdr_link_change_CT(core, addr, link);
+        
         qdr_check_addr_CT(core, addr, was_local);
+    }
 
     if (error)
         qdr_error_free(error);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index c91fa09..6482b22 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -128,7 +128,7 @@ static void qdr_link_route_activate_CT(qdr_core_t *core, 
qdr_link_route_t *lr, q
         if (DEQ_SIZE(lr->addr->conns) == 1) {
             key = (const char*) qd_hash_key_by_handle(lr->addr->hash_handle);
             if (key)
-                qdr_post_mobile_added_CT(core, key, 0, 0);
+                qdr_post_mobile_update_CT(core, key, 1, 1);
         }
     }
 
@@ -150,7 +150,7 @@ static void qdr_link_route_deactivate_CT(qdr_core_t *core, 
qdr_link_route_t *lr,
         if (DEQ_IS_EMPTY(lr->addr->conns)) {
             key = (const char*) qd_hash_key_by_handle(lr->addr->hash_handle);
             if (key)
-                qdr_post_mobile_removed_CT(core, key);
+                qdr_post_mobile_update_CT(core, key, 0, 0);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index e8f955e..4cec37c 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -138,15 +138,11 @@ void qdr_core_update_destination(qdr_core_t *core, const 
char *address_hash, uin
 
 void qdr_core_route_table_handlers(qdr_core_t           *core, 
                                    void                 *context,
-                                   qdr_mobile_added_t    mobile_added,
                                    qdr_mobile_update_t   mobile_update,
-                                   qdr_mobile_removed_t  mobile_removed,
                                    qdr_link_lost_t       link_lost)
 {
     core->rt_context        = context;
-    core->rt_mobile_added   = mobile_added;
     core->rt_mobile_update  = mobile_update;
-    core->rt_mobile_removed = mobile_removed;
     core->rt_link_lost      = link_lost;
 }
 
@@ -586,7 +582,7 @@ static void qdr_map_destination_CT(qdr_core_t *core, 
qdr_action_t *action, bool
         qd_bitmask_set_bit(addr->rnodes, router_maskbit);
         rnode->ref_count++;
         addr->cost_epoch--;
-        qdr_addr_start_inlinks_CT(core, addr);
+        qdr_addr_visit_inlinks_CT(core, addr);
     } while (false);
 
     qdr_field_free(address);
@@ -643,8 +639,8 @@ static void qdr_unmap_destination_CT(qdr_core_t *core, 
qdr_action_t *action, boo
 static void qdr_update_destination_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
     qdr_field_t *address      = action->args.route_table.address;
-    //uint32_t     in_links     = action->args.route_table.in_links;
-    //uint32_t     out_capacity = action->args.route_table.out_capacity;
+    uint32_t     in_links     = action->args.route_table.in_links;
+    uint32_t     out_capacity = action->args.route_table.out_capacity;
 
     if (discard) {
         qdr_field_free(address);
@@ -662,8 +658,15 @@ static void qdr_update_destination_CT(qdr_core_t *core, 
qdr_action_t *action, bo
         }
 
         //
-        // TODO - Process flow control update actions
+        // Update the remote flow control state.
         //
+        addr->remote_inlinks      = in_links;
+        addr->remote_out_capacity = out_capacity;
+
+        //
+        // Visit inlinks and issue/revoke credit based on the new fow control 
state.
+        //
+        qdr_addr_visit_inlinks_CT(core, addr);
     } while (false);
 
     qdr_field_free(address);
@@ -700,7 +703,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
         sub->addr = addr;
         DEQ_ITEM_INIT(sub);
         DEQ_INSERT_TAIL(addr->subscriptions, sub);
-        qdr_addr_start_inlinks_CT(core, addr);
+        qdr_addr_visit_inlinks_CT(core, addr);
 
     } else
         free(sub);
@@ -726,18 +729,6 @@ static void qdr_unsubscribe_CT(qdr_core_t *core, 
qdr_action_t *action, bool disc
 // Call-back Functions
 
//==================================================================================
 
-static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work)
-{
-    char *address_hash = qdr_field_copy(work->field);
-    if (address_hash) {
-        core->rt_mobile_added(core->rt_context, address_hash, work->in_links, 
work->out_capacity);
-        free(address_hash);
-    }
-
-    qdr_field_free(work->field);
-}
-
-
 static void qdr_do_mobile_update(qdr_core_t *core, qdr_general_work_t *work)
 {
     char *address_hash = qdr_field_copy(work->field);
@@ -750,34 +741,12 @@ static void qdr_do_mobile_update(qdr_core_t *core, 
qdr_general_work_t *work)
 }
 
 
-static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work)
-{
-    char *address_hash = qdr_field_copy(work->field);
-    if (address_hash) {
-        core->rt_mobile_removed(core->rt_context, address_hash);
-        free(address_hash);
-    }
-
-    qdr_field_free(work->field);
-}
-
-
 static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work)
 {
     core->rt_link_lost(core->rt_context, work->maskbit);
 }
 
 
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity)
-{
-    qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
-    work->field        = qdr_field(address_hash);
-    work->in_links     = in_links;
-    work->out_capacity = out_capacity;
-    qdr_post_general_work_CT(core, work);
-}
-
-
 void qdr_post_mobile_update_CT(qdr_core_t *core, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity)
 {
     qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_update);
@@ -788,14 +757,6 @@ void qdr_post_mobile_update_CT(qdr_core_t *core, const 
char *address_hash, uint3
 }
 
 
-void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash)
-{
-    qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed);
-    work->field = qdr_field(address_hash);
-    qdr_post_general_work_CT(core, work);
-}
-
-
 void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit)
 {
     qdr_general_work_t *work = qdr_general_work(qdr_do_link_lost);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 893dd6e..03a3e30 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -380,8 +380,9 @@ struct qdr_link_t {
     qdr_link_oper_status_t   oper_status;
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;
-    int                      capacity;
-    bool                     flow_started;   ///< for incoming, true iff 
initial credit has been granted
+    uint32_t                 capacity;       ///< Configured capacity
+    uint32_t                 credit_window;  ///< Currently effective credit 
amount
+    uint32_t                 credit_deficit; ///< Number of credit-replenishes 
to skip to reduce window
     bool                     drain_mode;
     int                      credit_to_core; ///< Number of the available 
credits incrementally given to the core
 
@@ -640,9 +641,7 @@ struct qdr_core_t {
     // Route table section
     //
     void                 *rt_context;
-    qdr_mobile_added_t    rt_mobile_added;
     qdr_mobile_update_t   rt_mobile_update;
-    qdr_mobile_removed_t  rt_mobile_removed;
     qdr_link_lost_t       rt_link_lost;
 
     //
@@ -700,7 +699,7 @@ void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char 
*label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, 
bool drain);
-void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
+void qdr_addr_visit_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
@@ -708,9 +707,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, 
qdr_delivery_t *delivery);
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity);
 void qdr_post_mobile_update_CT(qdr_core_t *core, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity);
-void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
 void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
 
 void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40d4c45..9cf338a 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -756,15 +756,26 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, 
qdr_link_t *link, int credit, bo
 {
     assert(link->link_direction == QD_INCOMING);
 
+    //
+    // If there is a credit deficit (i.e. the credit window shrank), then 
reduce the
+    // credit by up to the deficit.
+    //
+    if (link->credit_deficit > 0) {
+        if (link->credit_deficit > credit) {
+            link->credit_deficit -= credit;
+            credit = 0;
+        } else {
+            credit -= link->credit_deficit;
+            link->credit_deficit = 0;
+        }
+    }
+
     bool drain_changed = link->drain_mode |= drain;
     link->drain_mode   = drain;
 
     if (!drain_changed && credit == 0)
         return;
 
-    if (credit > 0)
-        link->flow_started = true;
-
     qdr_link_work_t *work = new_qdr_link_work_t();
     ZERO(work);
 
@@ -778,56 +789,92 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, 
qdr_link_t *link, int credit, bo
 }
 
 
+static void qdr_calculate_target_credit_CT(qdr_address_t *addr)
+{
+    uint32_t total_in_links     = DEQ_SIZE(addr->inlinks) + 
addr->remote_inlinks;
+    uint32_t total_out_capacity = addr->local_out_capacity + 
addr->remote_out_capacity;
+
+    if (total_out_capacity == 0)
+        addr->target_in_credit = 0;
+    else if (total_out_capacity <= total_in_links)
+        addr->target_in_credit = 1;
+    else
+        addr->target_in_credit = total_out_capacity / (total_in_links ? 
total_in_links : 1);
+}
+
+
 /**
- * This function should be called after adding a new destination 
(subscription, local link,
- * or remote node) to an address.  If this address now has exactly one 
destination (i.e. it
- * transitioned from unreachable to reachable), make sure any unstarted 
in-links are issued
- * initial credit.
+ * This function should be called whenever the target credit for the address 
is changed.
+ * Each inlink for the address will have its credit adjusted.
  *
  * Also, check the inlinks to see if there are undelivered messages.  If so, 
drain them to
  * the forwarder.
  */
-void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
+void qdr_addr_visit_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
 {
     //
-    // If there aren't any inlinks, there's no point in proceeding.
+    // Calculate the new target credit for the address.
     //
-    if (DEQ_SIZE(addr->inlinks) == 0)
-        return;
+    qdr_calculate_target_credit_CT(addr);
 
-    if (qdr_addr_path_count_CT(addr) == 1) {
-        qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks);
-        while (ref) {
-            qdr_link_t *link = ref->link;
+    qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks);
+    while (ref) {
+        qdr_link_t *link      = ref->link;
+        bool        from_zero = link->credit_window == 0;
 
+        if (DEQ_SIZE(addr->subscriptions) > 0) {
             //
-            // Issue credit to stalled links
+            // If the address has at least one in-process subscriber and the
+            // credit window is zero (first time through), simply issue the
+            // link capacity.  This address will not follow the credit 
heuristic.
             //
-            if (!link->flow_started)
+            if (from_zero) {
+                link->credit_window = link->capacity;
                 qdr_link_issue_credit_CT(core, link, link->capacity, false);
-
+            }
+        } else {
             //
-            // Drain undelivered deliveries via the forwarder
+            // Target credit is not to exceed the link capacity.
             //
-            if (DEQ_SIZE(link->undelivered) > 0) {
+            uint32_t target_credit = (addr->target_in_credit < link->capacity) 
? addr->target_in_credit : link->capacity;
+            int32_t  diff = (int32_t) target_credit - (int32_t) 
link->credit_window;
+
+            link->credit_window = target_credit;
+            if (diff > 0) {
                 //
-                // Move all the undelivered to a local list in case not all 
can be delivered.
-                // We don't want to loop here forever putting the same 
messages on the undelivered
-                // list.
+                // We increased the window for this link.  Issue the 
additional credit.
                 //
-                qdr_delivery_list_t deliveries;
-                DEQ_MOVE(link->undelivered, deliveries);
-
-                qdr_delivery_t *dlv = DEQ_HEAD(deliveries);
-                while (dlv) {
-                    DEQ_REMOVE_HEAD(deliveries);
-                    qdr_link_forward_CT(core, link, dlv, addr);
-                    dlv = DEQ_HEAD(deliveries);
-                }
+                qdr_link_issue_credit_CT(core, link, diff, false);
+            } else {
+                //
+                // We decreased (or didn't change) the window.  Record the 
deficit so we
+                // can withhold replenished credit.
+                //
+                link->credit_deficit = (uint32_t) (0 - diff);
             }
+        }
 
-            ref = DEQ_NEXT(ref);
+        //
+        // Drain undelivered deliveries via the forwarder after unblocking a 
zero-credit link.
+        //
+        if (from_zero && DEQ_SIZE(link->undelivered) > 0) {
+            //
+            // Move all the undelivered to a local list in case not all can be 
delivered.
+            // We don't want to loop here forever putting the same messages on 
the undelivered
+            // list.
+            //
+            qdr_delivery_list_t deliveries;
+            DEQ_MOVE(link->undelivered, deliveries);
+
+            qdr_delivery_t *dlv = DEQ_HEAD(deliveries);
+            while (dlv) {
+                DEQ_REMOVE_HEAD(deliveries);
+                qdr_link_forward_CT(core, link, dlv, addr);
+                dlv = DEQ_HEAD(deliveries);
+            }
         }
+
+        ref = DEQ_NEXT(ref);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/135ffe78/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 0529f8b..100acec 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -30,9 +30,7 @@
 static qd_log_source_t *log_source = 0;
 static PyObject        *pyRouter   = 0;
 static PyObject        *pyTick     = 0;
-static PyObject        *pyAdded    = 0;
 static PyObject        *pyUpdate   = 0;
-static PyObject        *pyRemoved  = 0;
 static PyObject        *pyLinkLost = 0;
 
 typedef struct {
@@ -360,27 +358,6 @@ static PyTypeObject RouterAdapterType = {
 };
 
 
-static void qd_router_mobile_added(void *context, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity)
-{
-    qd_router_t *router = (qd_router_t*) context;
-    PyObject    *pArgs;
-    PyObject    *pValue;
-
-    if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(3);
-        PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
-        PyTuple_SetItem(pArgs, 1, PyInt_FromLong((long) in_links));
-        PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) out_capacity));
-        pValue = PyObject_CallObject(pyAdded, pArgs);
-        qd_error_py();
-        Py_DECREF(pArgs);
-        Py_XDECREF(pValue);
-        qd_python_unlock(lock_state);
-    }
-}
-
-
 static void qd_router_mobile_update(void *context, const char *address_hash, 
uint32_t in_links, uint32_t out_capacity)
 {
     qd_router_t *router = (qd_router_t*) context;
@@ -402,32 +379,13 @@ static void qd_router_mobile_update(void *context, const 
char *address_hash, uin
 }
 
 
-static void qd_router_mobile_removed(void *context, const char *address_hash)
-{
-    qd_router_t *router = (qd_router_t*) context;
-    PyObject    *pArgs;
-    PyObject    *pValue;
-
-    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(1);
-        PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
-        pValue = PyObject_CallObject(pyRemoved, pArgs);
-        qd_error_py();
-        Py_DECREF(pArgs);
-        Py_XDECREF(pValue);
-        qd_python_unlock(lock_state);
-    }
-}
-
-
 static void qd_router_link_lost(void *context, int link_mask_bit)
 {
     qd_router_t *router = (qd_router_t*) context;
     PyObject    *pArgs;
     PyObject    *pValue;
 
-    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+    if (pyLinkLost && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
         qd_python_lock_state_t lock_state = qd_python_lock();
         pArgs = PyTuple_New(1);
         PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
@@ -447,9 +405,7 @@ qd_error_t qd_router_python_setup(qd_router_t *router)
 
     qdr_core_route_table_handlers(router->router_core,
                                   router,
-                                  qd_router_mobile_added,
                                   qd_router_mobile_update,
-                                  qd_router_mobile_removed,
                                   qd_router_link_lost);
 
     //
@@ -517,9 +473,7 @@ qd_error_t qd_router_python_setup(qd_router_t *router)
     QD_ERROR_PY_RET();
 
     pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); 
QD_ERROR_PY_RET();
-    pyAdded = PyObject_GetAttrString(pyRouter, "addressAdded"); 
QD_ERROR_PY_RET();
     pyUpdate = PyObject_GetAttrString(pyRouter, "addressUpdate"); 
QD_ERROR_PY_RET();
-    pyRemoved = PyObject_GetAttrString(pyRouter, "addressRemoved"); 
QD_ERROR_PY_RET();
     pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); 
QD_ERROR_PY_RET();
     return qd_error_code();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to