Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 ec1b4aea2 -> af4fef5af


DISPATCH-179 - Fleshed out the link handling in the core.  Added a qdr-specific 
terminus
               type to avoid threading issues with pn_terminus_t.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b37dd4a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b37dd4a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b37dd4a3

Branch: refs/heads/tross-DISPATCH-179-1
Commit: b37dd4a3ea483f68c470d0eccbbba06da2a9f069
Parents: ec1b4ae
Author: Ted Ross <[email protected]>
Authored: Wed Nov 25 12:09:18 2015 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Nov 25 12:09:18 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   | 114 +++++++++++++++++++--
 src/CMakeLists.txt                    |   1 +
 src/router_core/agent.c               |   1 +
 src/router_core/agent_link.c          |  25 ++---
 src/router_core/connections.c         | 153 +++++++++++++++++++++++++----
 src/router_core/route_tables.c        |   1 -
 src/router_core/router_core.c         |   2 +-
 src/router_core/router_core_private.h |  57 ++++++++---
 src/router_core/terminus.c            | 100 +++++++++++++++++++
 src/router_private.h                  |   9 --
 10 files changed, 389 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 4a22bcf..93de5c6 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -32,10 +32,11 @@
  * exclusive access to that connection.
  */
 
-typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_core_t       qdr_core_t;
 typedef struct qdr_connection_t qdr_connection_t;
-typedef struct qdr_link_t qdr_link_t;
-typedef struct qdr_delivery_t qdr_delivery_t;
+typedef struct qdr_link_t       qdr_link_t;
+typedef struct qdr_delivery_t   qdr_delivery_t;
+typedef struct qdr_terminus_t   qdr_terminus_t;
 
 /**
  * Allocate and start an instance of the router core module.
@@ -90,11 +91,24 @@ void qdr_core_subscribe(qdr_core_t *core, const char 
*address, char aclass, char
  */
 
 typedef enum {
+    QD_LINK_ENDPOINT,   ///< A link to a connected endpoint
+    QD_LINK_WAYPOINT,   ///< A link to a configured waypoint
+    QD_LINK_CONTROL,    ///< A link to a peer router for control messages
+    QD_LINK_ROUTER,     ///< A link to a peer router for routed messages
+    QD_LINK_AREA        ///< A link to a peer router in a different area (area 
boundary)
+} qd_link_type_t;
+ENUM_DECLARE(qd_link_type);
+
+typedef enum {
     QDR_ROLE_NORMAL,
     QDR_ROLE_INTER_ROUTER,
     QDR_ROLE_ON_DEMAND
 } qdr_connection_role_t;
 
+
+#define QDR_FLAGS_CAPABILITY_ROUTER_CONTROL 1
+#define QDR_FLAGS_CAPABILITY_ROUTER_DATA    2
+
 /**
  * qdr_connection_opened
  *
@@ -133,7 +147,18 @@ void qdr_connection_set_context(qdr_connection_t *conn, 
void *context);
  *
  * Retrieve the stored void pointer from the connection object.
  */
-void *qdr_connection_get_context(qdr_connection_t *conn);
+void *qdr_connection_get_context(const qdr_connection_t *conn);
+
+/**
+ * qdr_connection_process
+ *
+ * Allow the core to process work associated with this connection.
+ * This function MUST be called only on a thread that exclusively owns
+ * this connection.
+ *
+ * @param conn The pointer returned by qdr_connection_opened
+ */
+void qdr_connection_process(qdr_connection_t *conn);
 
 /**
  * qdr_connection_activate_t callback
@@ -142,6 +167,9 @@ void *qdr_connection_get_context(qdr_connection_t *conn);
  * the core has deliveries on links, disposition updates on deliveries, or 
flow updates
  * to be sent across the connection.
  *
+ * IMPORTANT: This function will be invoked on the core thread.  It must never 
block,
+ * delay, or do any lenghty computation.
+ *
  * @param context The context supplied when the callback was registered
  * @param conn The connection object to be activated
  */
@@ -149,6 +177,46 @@ typedef void (*qdr_connection_activate_t) (void *context, 
qdr_connection_t *conn
 
 /**
  ******************************************************************************
+ * Terminus functions
+ ******************************************************************************
+ */
+
+/**
+ * qdr_terminus
+ *
+ * Create a qdr_terminus_t that contains all the content of the
+ * pn_terminus_t.  Note that the pointer to the pn_terminus_t
+ * _will not_ be held or referenced further after this function
+ * returns.
+ *
+ * @param pn Pointer to a proton terminus object that will be copied into
+ *           the qdr_terminus object
+ * @return Pointer to a newly allocated qdr_terminus object
+ */
+qdr_terminus_t *qdr_terminus(pn_terminus_t *pn);
+
+/**
+ * qdr_terminus_free
+ *
+ * Free a qdr_terminus object once it is no longer needed.
+ *
+ * @param terminus The pointer returned by qdr_terminus()
+ */
+void qdr_terminus_free(qdr_terminus_t *terminus);
+
+/**
+ * qdr_terminus_copy
+ *
+ * Copy the contents of the qdr_terminus into a proton terminus
+ *
+ * @param from A qdr_terminus pointer returned by qdr_terminus()
+ * @param to A proton terminus to  be overwritten with the contents
+ *           of 'from'
+ */
+void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to);
+
+/**
+ ******************************************************************************
  * Link functions
  ******************************************************************************
  */
@@ -165,7 +233,27 @@ void qdr_link_set_context(qdr_link_t *link, void *context);
  *
  * Retrieve the stored void pointer from the link object.
  */
-void *qdr_link_get_context(qdr_link_t *link);
+void *qdr_link_get_context(const qdr_link_t *link);
+
+/**
+ * qdr_link_type
+ *
+ * Retrieve the link-type from the link object.
+ *
+ * @param link Link object
+ * @return Link-type
+ */
+qd_link_type_t qdr_link_type(const qdr_link_t *link);
+
+/**
+ * qdr_link_direction
+ *
+ * Retrieve the link-direction from the link object.
+ *
+ * @param link Link object
+ * @return Link-direction
+ */
+qd_direction_t qdr_link_direction(const qdr_link_t *link);
 
 /**
  * qdr_link_first_attach
@@ -179,7 +267,7 @@ void *qdr_link_get_context(qdr_link_t *link);
  * @param target Target terminus of the attach
  * @return A pointer to a new qdr_link_t object to track the link
  */
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
pn_terminus_t *source, pn_terminus_t *target);
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
qdr_terminus_t *source, qdr_terminus_t *target);
 
 /**
  * qdr_link_second_attach
@@ -191,7 +279,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, 
qd_direction_t dir, pn
  * @param source Source terminus of the attach
  * @param target Target terminus of the attach
  */
-void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source, 
pn_terminus_t *target);
+void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, 
qdr_terminus_t *target);
 
 /**
  * qdr_link_detach
@@ -206,10 +294,18 @@ void qdr_link_detach(qdr_link_t *link, pn_condition_t 
*condition);
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg, qd_field_iterator_t *addr);
 
-typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t 
*conn, qd_direction_t dir, pn_terminus_t *source, pn_terminus_t *target);
-typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, 
pn_terminus_t *source, pn_terminus_t *target);
+typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t 
*conn, qdr_link_t *link, 
+                                          qdr_terminus_t *source, 
qdr_terminus_t *target, uint32_t flags);
+typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, 
qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, 
pn_condition_t *condition);
 
+void qdr_connection_handlers(qdr_core_t                *core,
+                             void                      *context,
+                             qdr_connection_activate_t  activate,
+                             qdr_link_first_attach_t    first_attach,
+                             qdr_link_second_attach_t   second_attach,
+                             qdr_link_detach_t          detach);
+
 /**
  ******************************************************************************
  * Delivery functions

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 62b61b8..395614d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -74,6 +74,7 @@ set(qpid_dispatch_SOURCES
   router_core/router_core_thread.c
   router_core/route_tables.c
   router_core/management_agent.c
+  router_core/terminus.c
   router_delivery.c
   router_node.c
   router_forwarders.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 5fd0161..c216a74 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -128,6 +128,7 @@ static void qdrh_query_get_first_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_agent_emit_columns(qdr_query_t *query, const char 
*qdr_columns[], int column_count);
 static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t 
*attribute_names, const char *qdr_columns[], int column_count);
+
 
//==================================================================================
 // Interface Functions
 
//==================================================================================

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 61aaa48..350a717 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -37,19 +37,6 @@ static const char *address_key(qdr_address_t *addr) {
     return addr && addr->hash_handle ? (const char*) 
qd_hash_key_by_handle(addr->hash_handle) : NULL;
 }
 
-static const char* qd_router_link_remote_container(qdr_link_t *link) {
-    if (!link->link || !qd_link_pn(link->link))
-        return "";
-    return pn_connection_remote_container(
-        pn_session_connection(qd_link_pn_session(link->link)));
-}
-
-static const char* qd_router_link_name(qdr_link_t *link) {
-    if (!link->link || !qd_link_pn(link->link))
-        return "";
-    return pn_link_name(qd_link_pn(link->link));
-}
-
 static void qdr_agent_write_link_CT(qdr_query_t *query,  qdr_link_t *link )
 {
     qd_composed_field_t *body = query->body;
@@ -68,11 +55,11 @@ static void qdr_agent_write_link_CT(qdr_query_t *query,  
qdr_link_t *link )
             break;
 
         case QDR_LINK_REMOTE_CONTAINER:
-            qd_compose_insert_string(body, 
qd_router_link_remote_container(link));
+            qd_compose_insert_null(body); // FIXME
             break;
 
         case QDR_LINK_LINK_NAME:
-            qd_compose_insert_string(body, qd_router_link_name(link));
+            qd_compose_insert_null(body); // FIXME
             break;
 
         case QDR_LINK_LINK_TYPE:
@@ -126,7 +113,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
     //
     // If the offset goes beyond the set of links, end the query now.
     //
-    if (offset >= DEQ_SIZE(core->links)) {
+    if (true /*offset >= DEQ_SIZE(core->links)*/) {  // FIXME
         query->more = false;
         qdr_agent_enqueue_response_CT(core, query);
         return;
@@ -135,7 +122,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
     //
     // Run to the address at the offset.
     //
-    qdr_link_t *link = DEQ_HEAD(core->links);
+    qdr_link_t *link = 0; // DEQ_HEAD(core->links);  FIXME
     for (int i = 0; i < offset && link; i++)
         link = DEQ_NEXT(link);
     assert(link);
@@ -167,8 +154,8 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t 
*query)
         // If the address was removed in the time between this get and the 
previous one,
         // we need to use the saved offset, which is less efficient.
         //
-        if (query->next_offset < DEQ_SIZE(core->links)) {
-            link = DEQ_HEAD(core->links);
+        if (false /*query->next_offset < DEQ_SIZE(core->links)*/) {  // FIXME
+            link = 0; //DEQ_HEAD(core->links);
             for (int i = 0; i < query->next_offset && link; i++)
                 link = DEQ_NEXT(link);
         }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4d98811..bb4a416 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -26,6 +26,7 @@ static void qdr_link_second_attach_CT(qdr_core_t *core, 
qdr_action_t *action, bo
 static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
 
 ALLOC_DEFINE(qdr_connection_t);
+ALLOC_DEFINE(qdr_connection_work_t);
 
 
//==================================================================================
 // Internal Functions
@@ -48,6 +49,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, 
bool incoming, qdr_con
     conn->role         = role;
     conn->label        = label;
     conn->mask_bit     = -1;
+    DEQ_INIT(conn->links);
+    DEQ_INIT(conn->work_list);
+    conn->work_lock    = sys_mutex();
 
     action->args.connection.conn = conn;
     qdr_action_enqueue(core, action);
@@ -71,12 +75,45 @@ void qdr_connection_set_context(qdr_connection_t *conn, 
void *context)
 }
 
 
-void *qdr_connection_get_context(qdr_connection_t *conn)
+void *qdr_connection_get_context(const qdr_connection_t *conn)
 {
     return conn ? conn->user_context : 0;
 }
 
 
+void qdr_connection_process(qdr_connection_t *conn)
+{
+    qdr_connection_work_list_t  work_list;
+    qdr_core_t                 *core = conn->core;
+
+    sys_mutex_lock(conn->work_lock);
+    DEQ_MOVE(conn->work_list, work_list);
+    sys_mutex_unlock(conn->work_lock);
+
+    qdr_connection_work_t *work = DEQ_HEAD(work_list);
+    while (work) {
+        DEQ_REMOVE_HEAD(work_list);
+
+        switch (work->work_type) {
+        case QDR_CONNECTION_WORK_FIRST_ATTACH :
+            core->first_attach_handler(core->user_context, conn, work->link, 
work->source, work->target, work->flags);
+            break;
+
+        case QDR_CONNECTION_WORK_SECOND_ATTACH :
+            core->second_attach_handler(core->user_context, work->link, 
work->source, work->target);
+            break;
+
+        case QDR_CONNECTION_WORK_DETACH :
+            core->detach_handler(core->user_context, work->link, 
work->condition);
+            break;
+        }
+
+        free_qdr_connection_work_t(work);
+        work = DEQ_HEAD(work_list);
+    }
+}
+
+
 void qdr_link_set_context(qdr_link_t *link, void *context)
 {
     if (link)
@@ -84,13 +121,25 @@ void qdr_link_set_context(qdr_link_t *link, void *context)
 }
 
 
-void *qdr_link_get_context(qdr_link_t *link)
+void *qdr_link_get_context(const qdr_link_t *link)
 {
     return link ? link->user_context : 0;
 }
 
 
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
pn_terminus_t *source, pn_terminus_t *target)
+qd_link_type_t qdr_link_type(const qdr_link_t *link)
+{
+    return link->link_type;
+}
+
+
+qd_direction_t qdr_link_direction(const qdr_link_t *link)
+{
+    return link->link_direction;
+}
+
+
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, 
qdr_terminus_t *source, qdr_terminus_t *target)
 {
     qdr_action_t *action = qdr_action(qdr_link_first_attach_CT);
     qdr_link_t   *link   = new_qdr_link_t();
@@ -110,7 +159,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, 
qd_direction_t dir, pn
 }
 
 
-void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source, 
pn_terminus_t *target)
+void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, 
qdr_terminus_t *target)
 {
     qdr_action_t *action = qdr_action(qdr_link_second_attach_CT);
 
@@ -131,10 +180,72 @@ void qdr_link_detach(qdr_link_t *link, pn_condition_t 
*condition)
 }
 
 
+void qdr_connection_handlers(qdr_core_t                *core,
+                             void                      *context,
+                             qdr_connection_activate_t  activate,
+                             qdr_link_first_attach_t    first_attach,
+                             qdr_link_second_attach_t   second_attach,
+                             qdr_link_detach_t          detach)
+{
+    core->user_context          = context;
+    core->activate_handler      = activate;
+    core->first_attach_handler  = first_attach;
+    core->second_attach_handler = second_attach;
+    core->detach_handler        = detach;
+}
+
+
 
//==================================================================================
 // In-Thread Functions
 
//==================================================================================
 
+static void qdr_connection_enqueue_work_CT(qdr_core_t            *core,
+                                           qdr_connection_t      *conn,
+                                           qdr_connection_work_t *work)
+{
+    sys_mutex_lock(conn->work_lock);
+    DEQ_INSERT_TAIL(conn->work_list, work);
+    bool notify = DEQ_SIZE(conn->work_list) == 1;
+    sys_mutex_unlock(conn->work_lock);
+
+    if (notify)
+        core->activate_handler(core->user_context, conn);
+}
+
+
+static qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
+                                      qdr_connection_t *conn,
+                                      qd_link_type_t    link_type,
+                                      qd_direction_t    dir,
+                                      qdr_terminus_t   *source,
+                                      qdr_terminus_t   *target,
+                                      uint32_t          flags)
+{
+    //
+    // Create a new link, initiated by the router core.  This will involve 
issuing a first-attach outbound.
+    //
+    qdr_link_t *link = new_qdr_link_t();
+    ZERO(link);
+
+    link->core           = core;
+    link->user_context   = 0;
+    link->conn           = conn;
+    link->link_type      = link_type;
+    link->link_direction = dir;
+
+    qdr_connection_work_t *work = new_qdr_connection_work_t();
+    ZERO(work);
+    work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH;
+    work->link      = link;
+    work->source    = source;
+    work->target    = target;
+    work->flags     = flags;
+
+    qdr_connection_enqueue_work_CT(core, conn, work);
+    return link;
+}
+
+
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
     if (discard)
@@ -166,17 +277,16 @@ static void qdr_connection_opened_CT(qdr_core_t *core, 
qdr_action_t *action, boo
         if (!conn->incoming) {
             //
             // The connector-side of inter-router connections is responsible 
for setting up the
-            // inter-router links:  Two (in and out) for control, two for 
routed-message transfer
+            // inter-router links:  Two (in and out) for control, two for 
routed-message transfer.
             //
-            //(void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_INCOMING, ...);
+            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_INCOMING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_CONTROL);
+            (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, 
QD_OUTGOING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_CONTROL);
+            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_INCOMING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_DATA);
+            (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  
QD_OUTGOING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_DATA);
         }
     }
 
-    // If the role is INTER_ROUTER:
-    //    Assign this connection a neighbor mask-bit
-    //    If the connection is not incoming:
-    //       Establish a receiver and sender for router control
-    //       Establish a receiver and sender for inter-router message routing
+    //
     // If the role is ON_DEMAND:
     //    Activate waypoints associated with this connection
     //    Activate link-route destinations associated with this connection
@@ -201,7 +311,12 @@ static void qdr_connection_closed_CT(qdr_core_t *core, 
qdr_action_t *action, boo
     //        with the links.
     //
 
+    //
+    // Discard items on the work list
+    //
+
     DEQ_REMOVE(core->open_connections, conn);
+    sys_mutex_free(conn->work_lock);
     free_qdr_connection_t(conn);
 }
 
@@ -211,11 +326,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, 
qdr_action_t *action, boo
     if (discard)
         return;
 
-    //qdr_connection_t *conn   = action->args.connection.conn;
-    //qdr_link_t       *link   = action->args.connection.link;
-    //qd_direction_t    dir    = action->args.connection.dir;
-    //pn_terminus_t    *source = action->args.connection.source;
-    //pn_terminus_t    *target = action->args.connection.target;
+    //qdr_connection_t  *conn   = action->args.connection.conn;
+    //qdr_link_t        *link   = action->args.connection.link;
+    //qd_direction_t     dir    = action->args.connection.dir;
+    //qdr_terminus_t    *source = action->args.connection.source;
+    //qdr_terminus_t    *target = action->args.connection.target;
 
     //
     // Cases to be handled:
@@ -254,9 +369,9 @@ static void qdr_link_second_attach_CT(qdr_core_t *core, 
qdr_action_t *action, bo
     if (discard)
         return;
 
-    //qdr_link_t    *link   = action->args.connection.link;
-    //pn_terminus_t *source = action->args.connection.source;
-    //pn_terminus_t *target = action->args.connection.target;
+    //qdr_link_t     *link   = action->args.connection.link;
+    //qdr_terminus_t *source = action->args.connection.source;
+    //qdr_terminus_t *target = action->args.connection.target;
 
     //
     // Cases to be handled:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 96ae528..97ddad4 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -154,7 +154,6 @@ void qdr_core_subscribe(qdr_core_t *core, const char 
*address, char aclass, char
 void qdr_route_table_setup_CT(qdr_core_t *core)
 {
     DEQ_INIT(core->addrs);
-    DEQ_INIT(core->links);
     DEQ_INIT(core->routers);
     core->addr_hash = qd_hash(10, 32, 0);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a747572..4e86c9b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -81,7 +81,7 @@ ALLOC_DEFINE(qdr_field_t);
 
 qdr_field_t *qdr_field(const char *text)
 {
-    size_t length  = strlen(text);
+    size_t length  = text ? strlen(text) : 0;
     size_t ilength = length;
 
     if (length == 0)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 0347fe0..f70e97a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -70,8 +70,8 @@ struct qdr_action_t {
             qdr_delivery_t   *delivery;
             qd_message_t     *msg;
             qd_direction_t    dir;
-            pn_terminus_t    *source;
-            pn_terminus_t    *target;
+            qdr_terminus_t   *source;
+            qdr_terminus_t   *target;
             pn_condition_t   *condition;
         } connection;
 
@@ -160,20 +160,17 @@ struct qdr_link_t {
     qdr_core_t               *core;
     void                     *user_context;
     qdr_connection_t         *conn;            ///< [ref] Connection that owns 
this link
-    int                       mask_bit;        ///< Unique mask bit if this is 
an inter-router link
     qd_link_type_t            link_type;
     qd_direction_t            link_direction;
     qdr_address_t            *owning_addr;     ///< [ref] Address record that 
owns this link
     //qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns 
this link
-    qd_link_t                *link;            ///< [own] Link pointer 
DEPRECATE
     qdr_link_t               *connected_link;  ///< [ref] If this is a 
link-route, reference the connected link
     qdr_link_ref_t           *ref;             ///< Pointer to a containing 
reference object
-    char                     *target;          ///< Target address for 
incoming links
     qd_routed_event_list_t    event_fifo;      ///< FIFO of outgoing 
delivery/link events (no messages)
     qd_routed_event_list_t    msg_fifo;        ///< FIFO of incoming or 
outgoing message deliveries
     qd_router_delivery_list_t deliveries;      ///< [own] outstanding 
unsettled deliveries
-    bool                      strip_inbound_annotations;  ///<should the 
dispatch specific inbound annotations be stripped at the ingress router
-    bool                      strip_outbound_annotations; ///<should the 
dispatch specific outbound annotations be stripped at the egress router
+    bool                      strip_inbound_annotations;  ///<should the 
dispatch specific inbound annotations be stripped at the ingress router 
DEPRECATE/MOVE
+    bool                      strip_outbound_annotations; ///<should the 
dispatch specific outbound annotations be stripped at the egress router 
DEPRECATE/MOVE
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -244,16 +241,36 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, 
qdr_node_t *rnode);
 void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 
 
+typedef enum {
+    QDR_CONNECTION_WORK_FIRST_ATTACH,
+    QDR_CONNECTION_WORK_SECOND_ATTACH,
+    QDR_CONNECTION_WORK_DETACH
+} qdr_connection_work_type_t;
+
+typedef struct qdr_connection_work_t {
+    DEQ_LINKS(struct qdr_connection_work_t);
+    qdr_connection_work_type_t  work_type;
+    qdr_link_t                 *link;
+    qdr_terminus_t             *source;
+    qdr_terminus_t             *target;
+    pn_condition_t             *condition;
+    uint32_t                    flags;
+} qdr_connection_work_t;
+
+ALLOC_DECLARE(qdr_connection_work_t);
+DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
-    qdr_core_t            *core;
-    void                  *user_context;
-    bool                   incoming;
-    qdr_connection_role_t  role;
-    const char            *label;
-    int                    mask_bit;
-
-    // TODO - Add direct linkage to waypoints, link-route destinations, and 
links
+    qdr_core_t                 *core;
+    void                       *user_context;
+    bool                        incoming;
+    qdr_connection_role_t       role;
+    const char                 *label;
+    int                         mask_bit;
+    qdr_link_list_t             links;
+    qdr_connection_work_list_t  work_list;
+    sys_mutex_t                *work_lock;
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -287,6 +304,15 @@ struct qdr_core_t {
     qdr_mobile_removed_t  rt_mobile_removed;
     qdr_link_lost_t       rt_link_lost;
 
+    //
+    // Connection section
+    //
+    void                      *user_context;
+    qdr_connection_activate_t  activate_handler;
+    qdr_link_first_attach_t    first_attach_handler;
+    qdr_link_second_attach_t   second_attach_handler;
+    qdr_link_detach_t          detach_handler;
+
     const char *router_area;
     const char *router_id;
 
@@ -296,7 +322,6 @@ struct qdr_core_t {
     qdr_address_t        *routerma_addr;
     qdr_address_t        *hello_addr;
 
-    qdr_link_list_t       links;
     qdr_node_list_t       routers;
     qd_bitmask_t         *neighbor_free_mask;
     qdr_node_t          **routers_by_mask_bit;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
new file mode 100644
index 0000000..4623e09
--- /dev/null
+++ b/src/router_core/terminus.c
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+struct qdr_terminus_t {
+    qdr_field_t            *address;
+    pn_durability_t         durability;
+    pn_expiry_policy_t      expiry_policy;
+    pn_seconds_t            timeout;
+    bool                    dynamic;
+    pn_distribution_mode_t  distribution_mode;
+    pn_data_t              *properties;
+    pn_data_t              *filter;
+    pn_data_t              *outcomes;
+    pn_data_t              *capabilities;
+};
+
+ALLOC_DECLARE(qdr_terminus_t);
+ALLOC_DEFINE(qdr_terminus_t);
+
+qdr_terminus_t *qdr_terminus(pn_terminus_t *pn)
+{
+    if (pn == 0)
+        return 0;
+
+    qdr_terminus_t *term = new_qdr_terminus_t();
+    ZERO(term);
+
+    term->address           = qdr_field(pn_terminus_get_address(pn));
+    term->durability        = pn_terminus_get_durability(pn);
+    term->expiry_policy     = pn_terminus_get_expiry_policy(pn);
+    term->timeout           = pn_terminus_get_timeout(pn);
+    term->dynamic           = pn_terminus_is_dynamic(pn);
+    term->distribution_mode = pn_terminus_get_distribution_mode(pn);
+    term->properties        = pn_data(0);
+    term->filter            = pn_data(0);
+    term->outcomes          = pn_data(0);
+    term->capabilities      = pn_data(0);
+
+    pn_data_copy(term->properties,   pn_terminus_properties(pn));
+    pn_data_copy(term->filter,       pn_terminus_filter(pn));
+    pn_data_copy(term->outcomes,     pn_terminus_outcomes(pn));
+    pn_data_copy(term->capabilities, pn_terminus_capabilities(pn));
+
+    return term;
+}
+
+
+void qdr_terminus_free(qdr_terminus_t *term)
+{
+    if (term == 0)
+        return;
+
+    qdr_field_free(term->address);
+    pn_data_free(term->properties);
+    pn_data_free(term->filter);
+    pn_data_free(term->outcomes);
+    pn_data_free(term->capabilities);
+
+    free_qdr_terminus_t(term);
+}
+
+
+void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to)
+{
+    if (from->address) {
+        unsigned char *addr = qd_field_iterator_copy(from->address->iterator);
+        pn_terminus_set_address(to, (char*) addr);
+        free(addr);
+    }
+
+    pn_terminus_set_durability(to,        from->durability);
+    pn_terminus_set_expiry_policy(to,     from->expiry_policy);
+    pn_terminus_set_timeout(to,           from->timeout);
+    pn_terminus_set_dynamic(to,           from->dynamic);
+    pn_terminus_set_distribution_mode(to, from->distribution_mode);
+
+    pn_data_copy(pn_terminus_properties(to),   from->properties);
+    pn_data_copy(pn_terminus_filter(to),       from->filter);
+    pn_data_copy(pn_terminus_outcomes(to),     from->outcomes);
+    pn_data_copy(pn_terminus_capabilities(to), from->capabilities);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 6ecaaa9..50ab94e 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -51,15 +51,6 @@ typedef enum {
 } qd_router_mode_t;
 ENUM_DECLARE(qd_router_mode);
 
-typedef enum {
-    QD_LINK_ENDPOINT,   ///< A link to a connected endpoint
-    QD_LINK_WAYPOINT,   ///< A link to a configured waypoint
-    QD_LINK_CONTROL,    ///< A link to a peer router for control messages
-    QD_LINK_ROUTER,     ///< A link to a peer router for routed messages
-    QD_LINK_AREA        ///< A link to a peer router in a different area (area 
boundary)
-} qd_link_type_t;
-ENUM_DECLARE(qd_link_type);
-
 typedef struct qd_routed_event_t {
     DEQ_LINKS(struct qd_routed_event_t);
     qd_router_delivery_t *delivery;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to