Repository: qpid-dispatch Updated Branches: refs/heads/master 2519b1eca -> 0d8ded56d
DISPATCH-1145 - Added echo-prevention on the edge side (i.e. don't create a down-link for an address for which the only subscriber is the Interior). Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0d8ded56 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0d8ded56 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0d8ded56 Branch: refs/heads/master Commit: 0d8ded56dec36d13c22660289ecc73efae1ad610 Parents: 2519b1e Author: Ted Ross <[email protected]> Authored: Tue Oct 23 07:44:30 2018 -0400 Committer: Ted Ross <[email protected]> Committed: Tue Oct 23 07:44:30 2018 -0400 ---------------------------------------------------------------------- .../modules/edge_router/addr_proxy.c | 49 ++++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0d8ded56/src/router_core/modules/edge_router/addr_proxy.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index a384d6e..96accaf 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -91,10 +91,12 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr) static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) { - qdr_link_t *link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_INCOMING, - qdr_terminus_normal(key + 2), qdr_terminus_normal(0)); - qdr_core_bind_address_link_CT(ap->core, addr, link); - addr->edge_inlink = link; + if (addr->edge_inlink == 0) { + qdr_link_t *link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_INCOMING, + qdr_terminus_normal(key + 2), qdr_terminus_normal(0)); + qdr_core_bind_address_link_CT(ap->core, addr, link); + addr->edge_inlink = link; + } } @@ -114,7 +116,7 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_ // // Note that this link must not be bound to the address at this time. That will // happen later when the interior tells us that there are upstream destinations - // for the address. + // for the address (see on_transfer below). // qdr_link_t *link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_OUTGOING, qdr_terminus_normal(0), qdr_terminus_normal(key + 2)); @@ -193,8 +195,14 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c // If the address has more than zero attached destinations, create an // incoming link from the interior to signal the presence of local consumers. // - if (DEQ_SIZE(addr->rlinks) > 0) - add_inlink(ap, key, addr); + if (DEQ_SIZE(addr->rlinks) > 0) { + if (DEQ_SIZE(addr->rlinks) == 1) { + qdr_link_ref_t *ref = DEQ_HEAD(addr->rlinks); + if (ref->link->conn != ap->uplink_conn) + add_inlink(ap, key, addr); + } else + add_inlink(ap, key, addr); + } // // If the address has more than zero attached sources, create an outgoing link @@ -223,6 +231,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context; + qdr_link_ref_t *link_ref; // // If we don't have an established uplink, there is no further work to be done. @@ -239,13 +248,33 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr switch (event) { case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : - add_inlink(ap, key, addr); + // + // Add an uplink for this address only if the local destination is + // not the link to the interior. + // + link_ref = DEQ_HEAD(addr->rlinks); + if (link_ref->link->conn != ap->uplink_conn) + add_inlink(ap, key, addr); break; case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : del_inlink(ap, addr); break; + case QDRC_EVENT_ADDR_ONE_LOCAL_DEST : + // + // If the remaining local destination is the link to the interior, + // remove the inlink for this address. + // + link_ref = DEQ_HEAD(addr->rlinks); + if (link_ref->link->conn == ap->uplink_conn) + del_inlink(ap, addr); + break; + + case QDRC_EVENT_ADDR_TWO_DEST : + add_inlink(ap, key, addr); + break; + case QDRC_EVENT_ADDR_BECAME_SOURCE : add_outlink(ap, key, addr); break; @@ -296,7 +325,7 @@ static void on_transfer(void *link_context, bool dest = qd_parse_as_bool(dest_field); qdr_address_t *addr; - qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH); + qd_iterator_reset_view(addr_iter, ITER_VIEW_ALL); qd_hash_retrieve(ap->core->addr_hash, addr_iter, (void**) &addr); if (addr) { qdr_link_t *link = addr->edge_outlink; @@ -353,6 +382,8 @@ qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core) | QDRC_EVENT_CONN_EDGE_LOST | QDRC_EVENT_ADDR_BECAME_LOCAL_DEST | QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST + | QDRC_EVENT_ADDR_ONE_LOCAL_DEST + | QDRC_EVENT_ADDR_TWO_DEST | QDRC_EVENT_ADDR_BECAME_SOURCE | QDRC_EVENT_ADDR_NO_LONGER_SOURCE, on_conn_event, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
