Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 ca24d67f0 -> d218eeeaa


DISPATCH-179 - Tied in link-attach and link-detach for router and endpoint 
links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d218eeea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d218eeea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d218eeea

Branch: refs/heads/tross-DISPATCH-179-1
Commit: d218eeeaacaf7a01c28ace18967c02d35d2d0e63
Parents: ca24d67
Author: Ted Ross <[email protected]>
Authored: Fri Dec 18 16:27:31 2015 -0500
Committer: Ted Ross <[email protected]>
Committed: Fri Dec 18 16:27:31 2015 -0500

----------------------------------------------------------------------
 CMakeLists.txt                        |   2 +-
 include/qpid/dispatch/container.h     |  10 +-
 include/qpid/dispatch/router_core.h   |  34 ++-
 src/container.c                       |  64 +++---
 src/router_core/connections.c         | 323 +++++++++++++++++++++--------
 src/router_core/router_core_private.h |  10 +-
 src/router_core/terminus.c            |  10 +
 src/router_node.c                     |  70 ++++++-
 src/server.c                          |   4 +-
 9 files changed, 387 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0d55205..3ab5f81 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -46,7 +46,7 @@ if (NOT PYTHONLIBS_FOUND)
      message(FATAL_ERROR "Python Development Libraries are needed.")
 endif (NOT PYTHONLIBS_FOUND)
 
-set (SO_VERSION_MAJOR 1)
+set (SO_VERSION_MAJOR 2)
 set (SO_VERSION_MINOR 0)
 set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}")
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h 
b/include/qpid/dispatch/container.h
index c88f796..39fe968 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -74,7 +74,7 @@ typedef void (*qd_container_delivery_handler_t)    (void 
*node_context, qd_link_
 typedef int  (*qd_container_link_handler_t)        (void *node_context, 
qd_link_t *link);
 typedef int  (*qd_container_link_detach_handler_t) (void *node_context, 
qd_link_t *link, qd_detach_type_t dt);
 typedef void (*qd_container_node_handler_t)        (void *type_context, 
qd_node_t *node);
-typedef void (*qd_container_conn_handler_t)        (void *type_context, 
qd_connection_t *conn, void *context);
+typedef int  (*qd_container_conn_handler_t)        (void *type_context, 
qd_connection_t *conn, void *context);
 
 /**
  * A set  of Node handlers for deliveries, links and container events.
@@ -100,12 +100,8 @@ typedef struct {
     /** Invoked when an attach for a new outgoing link is received. */
     qd_container_link_handler_t outgoing_handler;
 
-    /**
-     * Invoked when an outgoing link is available for sending either deliveries
-     * or disposition changes.  The handler must check the link's credit to
-     * determine whether (and how many) message deliveries may be sent.
-     */
-    qd_container_link_handler_t writable_handler;
+    /** Invoked when an activated connection is available for writing. */
+    qd_container_conn_handler_t writable_handler;
 
     /** Invoked when a link is detached. */
     qd_container_link_detach_handler_t link_detach_handler;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index e42d818..9616b6b 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -150,12 +150,13 @@ void *qdr_connection_get_context(const qdr_connection_t 
*conn);
  * qdr_connection_process
  *
  * Allow the core to process work associated with this connection.
- * This function MUST be called only on a thread that exclusively owns
+ * This function MUST be called on a thread that exclusively owns
  * this connection.
  *
  * @param conn The pointer returned by qdr_connection_opened
+ * @return The number of actions processed.
  */
-void qdr_connection_process(qdr_connection_t *conn);
+int qdr_connection_process(qdr_connection_t *conn);
 
 /**
  * qdr_connection_activate_t callback
@@ -254,6 +255,16 @@ bool qdr_terminus_is_anonymous(qdr_terminus_t *term);
 bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
 
 /**
+ * qdr_terminus_set_address
+ *
+ * Set the terminus address
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @param addr An AMQP address (null-terminated string)
+ */
+void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr);
+
+/**
  * qdr_terminus_get_address
  *
  * Return the address of the terminus in the form of an iterator.
@@ -328,6 +339,16 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link);
 qd_direction_t qdr_link_direction(const qdr_link_t *link);
 
 /**
+ * qdr_link_name
+ *
+ * Retrieve the name of the link.
+ *
+ * @param link Link object
+ * @return The link's name
+ */
+const char *qdr_link_name(const qdr_link_t *link);
+
+/**
  * qdr_link_first_attach
  *
  * This function is invoked when a first-attach (not a response to an earlier 
attach)
@@ -339,7 +360,11 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link);
  * @param target Target terminus of the attach
  * @return A pointer to a new qdr_link_t object to track the link
  */
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
qdr_terminus_t *source, qdr_terminus_t *target);
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
+                                  qd_direction_t    dir,
+                                  qdr_terminus_t   *source,
+                                  qdr_terminus_t   *target,
+                                  const char       *name);
 
 /**
  * qdr_link_second_attach
@@ -359,9 +384,10 @@ void qdr_link_second_attach(qdr_link_t *link, 
qdr_terminus_t *source, qdr_termin
  * This function is invoked when a link detach arrives.
  *
  * @param link The link pointer returned by qdr_link_first_attach or in a 
FIRST_ATTACH event.
+ * @param dt The type of detach that occurred.
  * @param error The link error from the detach frame or 0 if none.
  */
-void qdr_link_detach(qdr_link_t *link, qdr_error_t *error);
+void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t 
*error);
 
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg, qd_field_iterator_t *addr);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index ace94e0..482f33b 100644
--- a/src/container.c
+++ b/src/container.c
@@ -202,20 +202,6 @@ static void handle_link_open(qd_container_t *container, 
pn_link_t *pn_link)
 }
 
 
-static int do_writable(pn_link_t *pn_link)
-{
-    qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
-    if (!link)
-        return 0;
-
-    qd_node_t *node = link->node;
-    if (!node)
-        return 0;
-
-    return node->ntype->writable_handler(node->context, link);
-}
-
-
 static void do_receive(pn_delivery_t *pnd)
 {
     pn_link_t     *pn_link  = pn_delivery_link(pnd);
@@ -342,25 +328,30 @@ static int close_handler(qd_container_t *container, void* 
conn_context, pn_conne
 }
 
 
-static int writable_handler(void* unused, pn_connection_t *conn, 
qd_connection_t* qd_conn)
+static int writable_handler(qd_container_t *container, pn_connection_t *conn, 
qd_connection_t* qd_conn)
 {
-    pn_link_t *pn_link;
-    int        event_count = 0;
+    const qd_node_type_t *nt;
+    int                   event_count = 0;
 
     //
-    // Call the attached node's writable handler for all active links
-    // on the connection.  Note that in Dispatch, links are considered
-    // bidirectional.  Incoming and outgoing only pertains to deliveries and
-    // deliveries are a subset of the traffic that flows both directions on 
links.
+    // Note the locking structure in this function.  Generally this would be 
unsafe, but since
+    // this particular list is only ever appended to and never has items 
inserted or deleted,
+    // this usage is safe in this case.
     //
-    if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
-        pn_link = pn_link_head(conn, 0);
-        while (pn_link) {
-            assert(pn_session_connection(pn_link_session(pn_link)) == conn);
-            event_count += do_writable(pn_link);
-            pn_link = pn_link_next(pn_link, 0);
-        }
+    sys_mutex_lock(container->lock);
+    qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list);
+    sys_mutex_unlock(container->lock);
+
+    while (nt_item) {
+        nt = nt_item->ntype;
+        if (nt->writable_handler)
+            event_count += nt->writable_handler(nt->type_context, qd_conn, 0);
+
+        sys_mutex_lock(container->lock);
+        nt_item = DEQ_NEXT(nt_item);
+        sys_mutex_unlock(container->lock);
     }
+
     return event_count;
 }
 
@@ -520,10 +511,19 @@ static int handler(void *handler_context, void 
*conn_context, qd_conn_event_t ev
     pn_connection_t *conn      = qd_connection_pn(qd_conn);
 
     switch (event) {
-    case QD_CONN_EVENT_LISTENER_OPEN:  open_handler(container, qd_conn, 
QD_INCOMING, conn_context);   break;
-    case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, 
QD_OUTGOING, conn_context);   break;
-    case QD_CONN_EVENT_CLOSE:          return close_handler(container, 
conn_context, conn, qd_conn);
-    case QD_CONN_EVENT_WRITABLE:       return writable_handler(conn_context, 
conn, qd_conn);
+    case QD_CONN_EVENT_LISTENER_OPEN:
+        open_handler(container, qd_conn, QD_INCOMING, conn_context);
+        return 1;
+
+    case QD_CONN_EVENT_CONNECTOR_OPEN:
+        open_handler(container, qd_conn, QD_OUTGOING, conn_context);
+        return 1;
+
+    case QD_CONN_EVENT_CLOSE:
+        return close_handler(container, conn_context, conn, qd_conn);
+
+    case QD_CONN_EVENT_WRITABLE:
+        return writable_handler(container, conn, qd_conn);
     }
 
     return 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 26064de..db469f6 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -23,9 +23,9 @@
 
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
-static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
-static void qdr_link_second_attach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
-static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
+static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard);
+static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard);
+static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
@@ -107,7 +107,7 @@ void *qdr_connection_get_context(const qdr_connection_t 
*conn)
 }
 
 
-void qdr_connection_process(qdr_connection_t *conn)
+int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
     qdr_core_t                 *core = conn->core;
@@ -116,6 +116,7 @@ void qdr_connection_process(qdr_connection_t *conn)
     DEQ_MOVE(conn->work_list, work_list);
     sys_mutex_unlock(conn->work_lock);
 
+    int event_count = DEQ_SIZE(work_list);
     qdr_connection_work_t *work = DEQ_HEAD(work_list);
     while (work) {
         DEQ_REMOVE_HEAD(work_list);
@@ -134,9 +135,14 @@ void qdr_connection_process(qdr_connection_t *conn)
             break;
         }
 
+        qdr_terminus_free(work->source);
+        qdr_terminus_free(work->target);
         free_qdr_connection_work_t(work);
+
         work = DEQ_HEAD(work_list);
     }
+
+    return event_count;
 }
 
 
@@ -165,21 +171,32 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
 }
 
 
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
qdr_terminus_t *source, qdr_terminus_t *target)
+const char *qdr_link_name(const qdr_link_t *link)
 {
-    qdr_action_t *action = qdr_action(qdr_link_first_attach_CT, 
"link_first_attach");
-    qdr_link_t   *link   = new_qdr_link_t();
+    return link->name;
+}
+
+
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
+                                  qd_direction_t    dir,
+                                  qdr_terminus_t   *source,
+                                  qdr_terminus_t   *target,
+                                  const char       *name)
+{
+    qdr_action_t   *action         = 
qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
+    qdr_link_t     *link           = new_qdr_link_t();
+    qdr_terminus_t *local_terminus = dir == QD_OUTGOING ? source : target;
 
     ZERO(link);
     link->core = conn->core;
     link->conn = conn;
+    link->name = (char*) malloc(strlen(name));
+    strcpy(link->name, name);
 
-    if (dir == QD_OUTGOING) {
-        if      (qdr_terminus_has_capability(target, 
QD_CAPABILITY_ROUTER_CONTROL))
-            link->link_type = QD_LINK_CONTROL;
-        else if (qdr_terminus_has_capability(target, 
QD_CAPABILITY_ROUTER_DATA))
-            link->link_type = QD_LINK_ROUTER;
-    }
+    if      (qdr_terminus_has_capability(local_terminus, 
QD_CAPABILITY_ROUTER_CONTROL))
+        link->link_type = QD_LINK_CONTROL;
+    else if (qdr_terminus_has_capability(local_terminus, 
QD_CAPABILITY_ROUTER_DATA))
+        link->link_type = QD_LINK_ROUTER;
 
     action->args.connection.conn   = conn;
     action->args.connection.link   = link;
@@ -194,7 +211,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, 
qd_direction_t dir, qd
 
 void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, 
qdr_terminus_t *target)
 {
-    qdr_action_t *action = qdr_action(qdr_link_second_attach_CT, 
"link_second_attach");
+    qdr_action_t *action = qdr_action(qdr_link_inbound_second_attach_CT, 
"link_second_attach");
 
     action->args.connection.link   = link;
     action->args.connection.source = source;
@@ -203,12 +220,13 @@ void qdr_link_second_attach(qdr_link_t *link, 
qdr_terminus_t *source, qdr_termin
 }
 
 
-void qdr_link_detach(qdr_link_t *link, qdr_error_t *error)
+void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error)
 {
-    qdr_action_t *action = qdr_action(qdr_link_detach_CT, "link_detach");
+    qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, 
"link_detach");
 
-    action->args.connection.link  = link;
-    action->args.connection.error = error;
+    action->args.connection.link   = link;
+    action->args.connection.error  = error;
+    action->args.connection.dt     = dt;
     qdr_action_enqueue(link->core, action);
 }
 
@@ -246,6 +264,48 @@ static void qdr_connection_enqueue_work_CT(qdr_core_t      
      *core,
 }
 
 
+#define QDR_DISCRIMINATOR_SIZE 16
+static void qdr_generate_discriminator(char *string)
+{
+    static const char *table = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+    long int rnd1 = random();
+    long int rnd2 = random();
+    long int rnd3 = random();
+    int      idx;
+    int      cursor = 0;
+
+    for (idx = 0; idx < 5; idx++) {
+        string[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
+        string[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
+        string[cursor++] = table[(rnd3 >> (idx * 6)) & 63];
+    }
+    string[cursor] = '\0';
+}
+
+
+/**
+ * Generate a temporary routable address for a destination connected to this
+ * router node.
+ */
+static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t 
length)
+{
+    char discriminator[QDR_DISCRIMINATOR_SIZE];
+    qdr_generate_discriminator(discriminator);
+    snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, 
core->router_id, discriminator);
+}
+
+
+/**
+ * Generate a link name
+ */
+static void qdr_generate_link_name(const char *label, char *buffer, size_t 
length)
+{
+    char discriminator[QDR_DISCRIMINATOR_SIZE];
+    qdr_generate_discriminator(discriminator);
+    snprintf(buffer, length, "%s.%s", label, discriminator);
+}
+
+
 static qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
                                       qdr_connection_t *conn,
                                       qd_link_type_t    link_type,
@@ -264,6 +324,8 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t       
*core,
     link->conn           = conn;
     link->link_type      = link_type;
     link->link_direction = dir;
+    link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
+    qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
 
     qdr_connection_work_t *work = new_qdr_connection_work_t();
     ZERO(work);
@@ -277,43 +339,27 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t       
*core,
 }
 
 
-static void qdr_link_reject_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_condition_t condition)
+static void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_condition_t condition)
 {
 }
 
 
-static void qdr_link_accept_CT(qdr_core_t *core, qdr_link_t *link)
+static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_terminus_t *source, qdr_terminus_t *target)
 {
-}
-
+    qdr_connection_work_t *work = new_qdr_connection_work_t();
+    ZERO(work);
+    work->work_type = QDR_CONNECTION_WORK_SECOND_ATTACH;
+    work->link      = link;
+    work->source    = source;
+    work->target    = target;
 
-static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_address_t *addr)
-{
+    qdr_connection_enqueue_work_CT(core, link->conn, work);
 }
 
 
-/**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- */
-static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t 
length)
+static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_address_t *addr,
+                                        qdr_terminus_t *source, qdr_terminus_t 
*target)
 {
-    static const char *table = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
-    char     discriminator[16];
-    long int rnd1 = random();
-    long int rnd2 = random();
-    long int rnd3 = random();
-    int      idx;
-    int      cursor = 0;
-
-    for (idx = 0; idx < 5; idx++) {
-        discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
-        discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
-        discriminator[cursor++] = table[(rnd3 >> (idx * 6)) & 63];
-    }
-    discriminator[cursor] = '\0';
-
-    snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, 
core->router_id, discriminator);
 }
 
 
@@ -341,7 +387,7 @@ static qd_address_semantics_t 
qdr_semantics_for_address(qdr_core_t *core, qd_fie
  * Depending on its policy, the address may be eligible for being closed out
  * (i.e. Logging its terminal statistics and freeing its resources).
  */
-/*static*/ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool 
was_local)
+static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool 
was_local)
 {
     if (addr == 0)
         return;
@@ -361,8 +407,8 @@ static qd_address_semantics_t 
qdr_semantics_for_address(qdr_core_t *core, qd_fie
     // If the address has no in-process consumer or destinations, it should be
     // deleted.
     //
-    if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && 
DEQ_SIZE(addr->rnodes) == 0 &&
-        !addr->waypoint && !addr->block_deletion) {
+    if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && 
DEQ_SIZE(addr->inlinks) == 0 &&
+        DEQ_SIZE(addr->rnodes) == 0 && !addr->waypoint && 
!addr->block_deletion) {
         qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
         DEQ_REMOVE(core->addrs, addr);
         qd_hash_handle_free(addr->hash_handle);
@@ -437,6 +483,7 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t     *core,
                 addr = qdr_address(qdr_dynamic_semantics);
                 qd_hash_insert(core->addr_hash, temp_iter, addr, 
&addr->hash_handle);
                 DEQ_INSERT_TAIL(core->addrs, addr);
+                qdr_terminus_set_address(terminus, temp_addr);
                 generating = false;
             }
             qd_field_iterator_free(temp_iter);
@@ -512,10 +559,10 @@ static void qdr_connection_opened_CT(qdr_core_t *core, 
qdr_action_t *action, boo
             // The connector-side of inter-router connections is responsible 
for setting up the
             // inter-router links:  Two (in and out) for control, two for 
routed-message transfer.
             //
-            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_INCOMING, qdr_terminus_router_control(), 0);
-            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_OUTGOING, 0, qdr_terminus_router_control());
-            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_INCOMING, qdr_terminus_router_data(), 0);
-            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_OUTGOING, 0, qdr_terminus_router_data());
+            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
+            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
+            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
+            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
         }
     }
 
@@ -554,7 +601,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, 
qdr_action_t *action, boo
 }
 
 
-static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 {
     if (discard)
         return;
@@ -567,15 +614,14 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
 
     //
     // Reject any attaches of inter-router links that arrive on connections 
that are not inter-router.
-    //
-    if ((link->link_type == QD_LINK_CONTROL || link->link_type == 
QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)
-        qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN);
-
-    //
     // Reject any waypoint links.  Waypoint links are always initiated by a 
router, not the remote container.
     //
-    if (link->link_type == QD_LINK_WAYPOINT)
-        qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN);
+    if ((link->link_type == QD_LINK_WAYPOINT) ||
+        ((link->link_type == QD_LINK_CONTROL || link->link_type == 
QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) {
+        qdr_link_outbound_detach_CT(core, link, QDR_CONDITION_FORBIDDEN);
+        qdr_terminus_free(source);
+        qdr_terminus_free(target);
+    }
 
     if (dir == QD_INCOMING) {
         //
@@ -585,24 +631,28 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
         case QD_LINK_ENDPOINT: {
             if (qdr_terminus_is_anonymous(target)) {
                 link->owning_addr = 0;
-                qdr_link_accept_CT(core, link);
+                qdr_link_outbound_second_attach_CT(core, link, source, target);
+
             } else {
                 //
                 // This link has a target address
                 //
                 bool           link_route;
                 qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, 
dir, target, false, false, &link_route);
-                if (!addr)
+                if (!addr) {
                     //
                     // No route to this destination, reject the link
                     //
-                    qdr_link_reject_CT(core, link, 
QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                    qdr_link_outbound_detach_CT(core, link, 
QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                    qdr_terminus_free(source);
+                    qdr_terminus_free(target);
+                }
 
                 else if (link_route)
                     //
                     // This is a link-routed destination, forward the attach 
to the next hop
                     //
-                    qdr_forward_first_attach_CT(core, link, addr);
+                    qdr_forward_first_attach_CT(core, link, addr, source, 
target);
 
                 else {
                     //
@@ -611,7 +661,7 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
                     //
                     link->owning_addr = addr;
                     qdr_add_link_ref(&addr->inlinks, link);
-                    qdr_link_accept_CT(core, link);
+                    qdr_link_outbound_second_attach_CT(core, link, source, 
target);
                 }
             }
             break;
@@ -622,9 +672,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
             break;
 
         case QD_LINK_CONTROL:
+            qdr_link_outbound_second_attach_CT(core, link, source, target);
             break;
 
         case QD_LINK_ROUTER:
+            qdr_link_outbound_second_attach_CT(core, link, source, target);
             break;
         }
     } else {
@@ -635,17 +687,20 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
         case QD_LINK_ENDPOINT: {
             bool           link_route;
             qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, 
source, true, true, &link_route);
-            if (!addr)
+            if (!addr) {
                 //
                 // No route to this destination, reject the link
                 //
-                qdr_link_reject_CT(core, link, 
QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                qdr_link_outbound_detach_CT(core, link, 
QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+                qdr_terminus_free(source);
+                qdr_terminus_free(target);
+            }
 
             else if (link_route)
                 //
                 // This is a link-routed destination, forward the attach to 
the next hop
                 //
-                qdr_forward_first_attach_CT(core, link, addr);
+                qdr_forward_first_attach_CT(core, link, addr, source, target);
 
             else {
                 //
@@ -659,7 +714,7 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
                     if (key && *key == 'M')
                         qdr_post_mobile_added_CT(core, key);
                 }
-                qdr_link_accept_CT(core, link);
+                qdr_link_outbound_second_attach_CT(core, link, source, target);
             }
             break;
         }
@@ -672,10 +727,12 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
             link->owning_addr = core->hello_addr;
             qdr_add_link_ref(&core->hello_addr->rlinks, link);
             core->control_links_by_mask_bit[conn->mask_bit] = link;
+            qdr_link_outbound_second_attach_CT(core, link, source, target);
             break;
 
         case QD_LINK_ROUTER:
             core->data_links_by_mask_bit[conn->mask_bit] = link;
+            qdr_link_outbound_second_attach_CT(core, link, source, target);
             break;
         }
     }
@@ -699,14 +756,59 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
 }
 
 
-static void qdr_link_second_attach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 {
     if (discard)
         return;
 
-    //qdr_link_t     *link   = action->args.connection.link;
-    //qdr_terminus_t *source = action->args.connection.source;
-    //qdr_terminus_t *target = action->args.connection.target;
+    qdr_connection_t *conn   = action->args.connection.conn;
+    qdr_link_t       *link   = action->args.connection.link;
+    qd_direction_t    dir    = action->args.connection.dir;
+    qdr_terminus_t   *source = action->args.connection.source;
+    qdr_terminus_t   *target = action->args.connection.target;
+
+    if (dir == QD_INCOMING) {
+        //
+        // Handle incoming link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            break;
+
+        case QD_LINK_WAYPOINT:
+            break;
+
+        case QD_LINK_CONTROL:
+            break;
+
+        case QD_LINK_ROUTER:
+            break;
+        }
+    } else {
+        //
+        // Handle outgoing link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            break;
+
+        case QD_LINK_WAYPOINT:
+            break;
+
+        case QD_LINK_CONTROL:
+            link->owning_addr = core->hello_addr;
+            qdr_add_link_ref(&core->hello_addr->rlinks, link);
+            core->control_links_by_mask_bit[conn->mask_bit] = link;
+            break;
+
+        case QD_LINK_ROUTER:
+            core->data_links_by_mask_bit[conn->mask_bit] = link;
+            break;
+        }
+    }
+
+    qdr_terminus_free(source);
+    qdr_terminus_free(target);
 
     //
     // Cases to be handled:
@@ -723,28 +825,79 @@ static void qdr_link_second_attach_CT(qdr_core_t *core, 
qdr_action_t *action, bo
 }
 
 
-static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
+static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
     if (discard)
         return;
 
-    qdr_link_t  *link  = action->args.connection.link;
-    //qdr_error_t *error = action->args.connection.error;
+    qdr_connection_t *conn      = action->args.connection.conn;
+    qdr_link_t       *link      = action->args.connection.link;
+    //qdr_error_t      *error     = action->args.connection.error;
+    qd_detach_type_t  dt        = action->args.connection.dt;
+    qdr_address_t    *addr      = link->owning_addr;
+    bool              was_local = false;
 
-    switch (link->link_type) {
-    case QD_LINK_ENDPOINT:
-        break;
+    link->owning_addr = 0;
 
-    case QD_LINK_WAYPOINT:
-        break;
+    if (link->link_direction == QD_INCOMING) {
+        //
+        // Handle incoming link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            if (addr)
+                qdr_del_link_ref(&addr->inlinks, link);
+            break;
 
-    case QD_LINK_CONTROL:
-        break;
+        case QD_LINK_WAYPOINT:
+            break;
+
+        case QD_LINK_CONTROL:
+            break;
 
-    case QD_LINK_ROUTER:
-        break;
+        case QD_LINK_ROUTER:
+            break;
+        }
+    } else {
+        //
+        // Handle outgoing link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            if (addr) {
+                qdr_del_link_ref(&addr->rlinks, link);
+                was_local = true;
+            }
+            break;
+
+        case QD_LINK_WAYPOINT:
+            break;
+
+        case QD_LINK_CONTROL:
+            qdr_del_link_ref(&core->hello_addr->rlinks, link);
+            core->control_links_by_mask_bit[conn->mask_bit] = 0;
+            qdr_post_link_lost_CT(core, conn->mask_bit);
+            break;
+
+        case QD_LINK_ROUTER:
+            core->data_links_by_mask_bit[conn->mask_bit] = 0;
+            break;
+        }
     }
 
+    //
+    // If the detach occurred via protocol, send a detach back.
+    // TODO - Note that this is not appropriate for routed links.
+    //
+    if (dt != QD_LOST)
+        qdr_link_outbound_detach_CT(core, link, 0);  // TODO - Fix error arg
+
+    //
+    // If there was an address associated with this link, check to see if any 
address-related
+    // cleanup has to be done.
+    //
+    if (addr)
+        qdr_check_addr_CT(core, addr, was_local);
 
     //
     // Cases to be handled:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index db84c55..0ee1604 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -72,6 +72,7 @@ struct qdr_action_t {
             qdr_terminus_t   *source;
             qdr_terminus_t   *target;
             qdr_error_t      *error;
+            qd_detach_type_t  dt;
         } connection;
 
         //
@@ -161,6 +162,7 @@ struct qdr_link_t {
     qdr_connection_t         *conn;            ///< [ref] Connection that owns 
this link
     qd_link_type_t            link_type;
     qd_direction_t            link_direction;
+    char                     *name;
     qdr_address_t            *owning_addr;     ///< [ref] Address record that 
owns this link
     //qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns 
this link
     qdr_link_t               *connected_link;  ///< [ref] If this is a 
link-route, reference the connected link
@@ -245,7 +247,9 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, 
qdr_node_t *rnode);
 // General Work
 //
 // The following types are used to post work to the IO threads for
-// non-connection-specific action.
+// non-connection-specific action.  These actions are serialized through
+// a zero-delay timer and are processed by one thread at a time.  General
+// actions occur in-order and are not run concurrently.
 //
 typedef struct qdr_general_work_t qdr_general_work_t;
 typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, 
qdr_general_work_t *work);
@@ -266,7 +270,9 @@ qdr_general_work_t 
*qdr_general_work(qdr_general_work_handler_t handler);
 // Connection Work
 //
 // The following types are used to post work to the IO threads for
-// connection-specific action.
+// connection-specific action.  The actions for a particular connection
+// are run in-order and are not concurrent.  Actions for different connections
+// will run concurrently.
 //
 typedef enum {
     QDR_CONNECTION_WORK_FIRST_ATTACH,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 175acf4..39ad368 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -83,6 +83,9 @@ void qdr_terminus_free(qdr_terminus_t *term)
 
 void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to)
 {
+    if (!from)
+        return;
+
     if (from->address) {
         unsigned char *addr = qd_field_iterator_copy(from->address->iterator);
         pn_terminus_set_address(to, (char*) addr);
@@ -135,6 +138,13 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term)
 }
 
 
+void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr)
+{
+    qdr_field_free(term->address);
+    term->address = qdr_field(addr);
+}
+
+
 qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
 {
     if (qdr_terminus_is_anonymous(term))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index e9e8012..97b3fc1 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -199,10 +199,21 @@ void qd_router_link_free_LH(qd_router_link_t *rlink)
 }
 
 
+static int router_writable_conn_handler(void *type_context, qd_connection_t 
*conn, void *context)
+{
+    qdr_connection_t *qconn = (qdr_connection_t*) 
qd_connection_get_context(conn);
+
+    if (qconn)
+        return qdr_connection_process(qconn);
+    return 0;
+}
+
+
 /**
  * Outgoing Link Writable Handler
+ * DEPRECATE
  */
-static int router_writable_link_handler(void* context, qd_link_t *link)
+/*static*/ int router_writable_link_handler(void* context, qd_link_t *link)
 {
     qd_router_t            *router = (qd_router_t*) context;
     qd_router_delivery_t   *delivery;
@@ -821,7 +832,8 @@ static int router_incoming_link_handler(void* context, 
qd_link_t *link)
     qdr_connection_t *qdr_conn = (qdr_connection_t*) 
qd_connection_get_context(conn);
     qdr_link_t       *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
                                                        
qdr_terminus(qd_link_remote_source(link)),
-                                                       
qdr_terminus(qd_link_remote_target(link)));
+                                                       
qdr_terminus(qd_link_remote_target(link)),
+                                                       
pn_link_name(qd_link_pn(link)));
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 
@@ -838,7 +850,8 @@ static int router_outgoing_link_handler(void* context, 
qd_link_t *link)
     qdr_connection_t *qdr_conn = (qdr_connection_t*) 
qd_connection_get_context(conn);
     qdr_link_t       *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
                                                        
qdr_terminus(qd_link_remote_source(link)),
-                                                       
qdr_terminus(qd_link_remote_target(link)));
+                                                       
qdr_terminus(qd_link_remote_target(link)),
+                                                       
pn_link_name(qd_link_pn(link)));
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 
@@ -915,14 +928,14 @@ static int router_link_detach_handler(void* context, 
qd_link_t *link, qd_detach_
         qdr_error_t *error = qdr_error_from_pn(cond);
         if (!error && dt == QD_LOST)
             error = qdr_error("qd:routed-link-lost", "Connectivity to the peer 
container was lost");
-        qdr_link_detach(rlink, error);
+        qdr_link_detach(rlink, dt, error);
     }
 
     return 0;
 }
 
 
-static void router_inbound_opened_handler(void *type_context, qd_connection_t 
*conn, void *context)
+static int router_inbound_opened_handler(void *type_context, qd_connection_t 
*conn, void *context)
 {
     qd_router_t           *router = (qd_router_t*) type_context;
     qdr_connection_role_t  role   = qd_router_connection_role(conn);
@@ -930,10 +943,12 @@ static void router_inbound_opened_handler(void 
*type_context, qd_connection_t *c
 
     qd_connection_set_context(conn, qdrc);
     qdr_connection_set_context(qdrc, conn);
+
+    return 0;
 }
 
 
-static void router_outbound_opened_handler(void *type_context, qd_connection_t 
*conn, void *context)
+static int router_outbound_opened_handler(void *type_context, qd_connection_t 
*conn, void *context)
 {
     qd_router_t           *router = (qd_router_t*) type_context;
     qdr_connection_role_t  role   = qd_router_connection_role(conn);
@@ -941,14 +956,18 @@ static void router_outbound_opened_handler(void 
*type_context, qd_connection_t *
 
     qd_connection_set_context(conn, qdrc);
     qdr_connection_set_context(qdrc, conn);
+
+    return 0;
 }
 
 
-static void router_closed_handler(void *type_context, qd_connection_t *conn, 
void *context)
+static int router_closed_handler(void *type_context, qd_connection_t *conn, 
void *context)
 {
     qdr_connection_t *qdrc = (qdr_connection_t*) 
qd_connection_get_context(conn);
     qdr_connection_closed(qdrc);
     qd_connection_set_context(conn, 0);
+
+    return 0;
 }
 
 
@@ -969,7 +988,7 @@ static qd_node_type_t router_node = {"router", 0, 0,
                                      router_disposition_handler,
                                      router_incoming_link_handler,
                                      router_outgoing_link_handler,
-                                     router_writable_link_handler,
+                                     router_writable_conn_handler,
                                      router_link_detach_handler,
                                      router_link_attach_handler,
                                      router_link_flow_handler,
@@ -1080,11 +1099,46 @@ static void qd_router_link_first_attach(void            
 *context,
                                         qdr_terminus_t   *source,
                                         qdr_terminus_t   *target)
 {
+    qd_router_t     *router = (qd_router_t*) context;
+    qd_connection_t *qconn  = (qd_connection_t*) 
qdr_connection_get_context(conn);
+
+    //
+    // Create a new link to be attached
+    //
+    qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), 
qdr_link_name(link));
+
+    //
+    // Copy the source and target termini to the link
+    //
+    qdr_terminus_copy(source, qd_link_source(qlink));
+    qdr_terminus_copy(target, qd_link_target(qlink));
+
+    //
+    // Associate the qd_link and the qdr_link to each other
+    //
+    qdr_link_set_context(link, qlink);
+    qd_link_set_context(qlink, link);
+
+    //
+    // Open (attach) the link
+    //
+    pn_link_open(qd_link_pn(qlink));
 }
 
 
 static void qd_router_link_second_attach(void *context, qdr_link_t *link, 
qdr_terminus_t *source, qdr_terminus_t *target)
 {
+    qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+    if (!qlink)
+        return;
+
+    qdr_terminus_copy(source, qd_link_source(qlink));
+    qdr_terminus_copy(target, qd_link_target(qlink));
+
+    //
+    // Open (attach) the link
+    //
+    pn_link_open(qd_link_pn(qlink));
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index b7303b2..441f575 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1181,8 +1181,10 @@ void qd_server_activate(qd_connection_t *ctx)
     if (!ctor)
         return;
 
-    if (!qdpn_connector_closed(ctor))
+    if (!qdpn_connector_closed(ctor)) {
         qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
+        qdpn_driver_wakeup(ctx->server->driver);
+    }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to