Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 ec1b4aea2 -> af4fef5af
DISPATCH-179 - Fleshed out the link handling in the core. Added a qdr-specific
terminus
type to avoid threading issues with pn_terminus_t.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b37dd4a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b37dd4a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b37dd4a3
Branch: refs/heads/tross-DISPATCH-179-1
Commit: b37dd4a3ea483f68c470d0eccbbba06da2a9f069
Parents: ec1b4ae
Author: Ted Ross <[email protected]>
Authored: Wed Nov 25 12:09:18 2015 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Nov 25 12:09:18 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 114 +++++++++++++++++++--
src/CMakeLists.txt | 1 +
src/router_core/agent.c | 1 +
src/router_core/agent_link.c | 25 ++---
src/router_core/connections.c | 153 +++++++++++++++++++++++++----
src/router_core/route_tables.c | 1 -
src/router_core/router_core.c | 2 +-
src/router_core/router_core_private.h | 57 ++++++++---
src/router_core/terminus.c | 100 +++++++++++++++++++
src/router_private.h | 9 --
10 files changed, 389 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h
b/include/qpid/dispatch/router_core.h
index 4a22bcf..93de5c6 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -32,10 +32,11 @@
* exclusive access to that connection.
*/
-typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_core_t qdr_core_t;
typedef struct qdr_connection_t qdr_connection_t;
-typedef struct qdr_link_t qdr_link_t;
-typedef struct qdr_delivery_t qdr_delivery_t;
+typedef struct qdr_link_t qdr_link_t;
+typedef struct qdr_delivery_t qdr_delivery_t;
+typedef struct qdr_terminus_t qdr_terminus_t;
/**
* Allocate and start an instance of the router core module.
@@ -90,11 +91,24 @@ void qdr_core_subscribe(qdr_core_t *core, const char
*address, char aclass, char
*/
typedef enum {
+ QD_LINK_ENDPOINT, ///< A link to a connected endpoint
+ QD_LINK_WAYPOINT, ///< A link to a configured waypoint
+ QD_LINK_CONTROL, ///< A link to a peer router for control messages
+ QD_LINK_ROUTER, ///< A link to a peer router for routed messages
+ QD_LINK_AREA ///< A link to a peer router in a different area (area
boundary)
+} qd_link_type_t;
+ENUM_DECLARE(qd_link_type);
+
+typedef enum {
QDR_ROLE_NORMAL,
QDR_ROLE_INTER_ROUTER,
QDR_ROLE_ON_DEMAND
} qdr_connection_role_t;
+
+#define QDR_FLAGS_CAPABILITY_ROUTER_CONTROL 1
+#define QDR_FLAGS_CAPABILITY_ROUTER_DATA 2
+
/**
* qdr_connection_opened
*
@@ -133,7 +147,18 @@ void qdr_connection_set_context(qdr_connection_t *conn,
void *context);
*
* Retrieve the stored void pointer from the connection object.
*/
-void *qdr_connection_get_context(qdr_connection_t *conn);
+void *qdr_connection_get_context(const qdr_connection_t *conn);
+
+/**
+ * qdr_connection_process
+ *
+ * Allow the core to process work associated with this connection.
+ * This function MUST be called only on a thread that exclusively owns
+ * this connection.
+ *
+ * @param conn The pointer returned by qdr_connection_opened
+ */
+void qdr_connection_process(qdr_connection_t *conn);
/**
* qdr_connection_activate_t callback
@@ -142,6 +167,9 @@ void *qdr_connection_get_context(qdr_connection_t *conn);
* the core has deliveries on links, disposition updates on deliveries, or
flow updates
* to be sent across the connection.
*
+ * IMPORTANT: This function will be invoked on the core thread. It must never
block,
+ * delay, or do any lenghty computation.
+ *
* @param context The context supplied when the callback was registered
* @param conn The connection object to be activated
*/
@@ -149,6 +177,46 @@ typedef void (*qdr_connection_activate_t) (void *context,
qdr_connection_t *conn
/**
******************************************************************************
+ * Terminus functions
+ ******************************************************************************
+ */
+
+/**
+ * qdr_terminus
+ *
+ * Create a qdr_terminus_t that contains all the content of the
+ * pn_terminus_t. Note that the pointer to the pn_terminus_t
+ * _will not_ be held or referenced further after this function
+ * returns.
+ *
+ * @param pn Pointer to a proton terminus object that will be copied into
+ * the qdr_terminus object
+ * @return Pointer to a newly allocated qdr_terminus object
+ */
+qdr_terminus_t *qdr_terminus(pn_terminus_t *pn);
+
+/**
+ * qdr_terminus_free
+ *
+ * Free a qdr_terminus object once it is no longer needed.
+ *
+ * @param terminus The pointer returned by qdr_terminus()
+ */
+void qdr_terminus_free(qdr_terminus_t *terminus);
+
+/**
+ * qdr_terminus_copy
+ *
+ * Copy the contents of the qdr_terminus into a proton terminus
+ *
+ * @param from A qdr_terminus pointer returned by qdr_terminus()
+ * @param to A proton terminus to be overwritten with the contents
+ * of 'from'
+ */
+void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to);
+
+/**
+ ******************************************************************************
* Link functions
******************************************************************************
*/
@@ -165,7 +233,27 @@ void qdr_link_set_context(qdr_link_t *link, void *context);
*
* Retrieve the stored void pointer from the link object.
*/
-void *qdr_link_get_context(qdr_link_t *link);
+void *qdr_link_get_context(const qdr_link_t *link);
+
+/**
+ * qdr_link_type
+ *
+ * Retrieve the link-type from the link object.
+ *
+ * @param link Link object
+ * @return Link-type
+ */
+qd_link_type_t qdr_link_type(const qdr_link_t *link);
+
+/**
+ * qdr_link_direction
+ *
+ * Retrieve the link-direction from the link object.
+ *
+ * @param link Link object
+ * @return Link-direction
+ */
+qd_direction_t qdr_link_direction(const qdr_link_t *link);
/**
* qdr_link_first_attach
@@ -179,7 +267,7 @@ void *qdr_link_get_context(qdr_link_t *link);
* @param target Target terminus of the attach
* @return A pointer to a new qdr_link_t object to track the link
*/
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir,
pn_terminus_t *source, pn_terminus_t *target);
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir,
qdr_terminus_t *source, qdr_terminus_t *target);
/**
* qdr_link_second_attach
@@ -191,7 +279,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qd_direction_t dir, pn
* @param source Source terminus of the attach
* @param target Target terminus of the attach
*/
-void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source,
pn_terminus_t *target);
+void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source,
qdr_terminus_t *target);
/**
* qdr_link_detach
@@ -206,10 +294,18 @@ void qdr_link_detach(qdr_link_t *link, pn_condition_t
*condition);
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg, qd_field_iterator_t *addr);
-typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t
*conn, qd_direction_t dir, pn_terminus_t *source, pn_terminus_t *target);
-typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
pn_terminus_t *source, pn_terminus_t *target);
+typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t
*conn, qdr_link_t *link,
+ qdr_terminus_t *source,
qdr_terminus_t *target, uint32_t flags);
+typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link,
pn_condition_t *condition);
+void qdr_connection_handlers(qdr_core_t *core,
+ void *context,
+ qdr_connection_activate_t activate,
+ qdr_link_first_attach_t first_attach,
+ qdr_link_second_attach_t second_attach,
+ qdr_link_detach_t detach);
+
/**
******************************************************************************
* Delivery functions
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 62b61b8..395614d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -74,6 +74,7 @@ set(qpid_dispatch_SOURCES
router_core/router_core_thread.c
router_core/route_tables.c
router_core/management_agent.c
+ router_core/terminus.c
router_delivery.c
router_node.c
router_forwarders.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 5fd0161..c216a74 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -128,6 +128,7 @@ static void qdrh_query_get_first_CT(qdr_core_t *core,
qdr_action_t *action, bool
static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action,
bool discard);
static void qdr_agent_emit_columns(qdr_query_t *query, const char
*qdr_columns[], int column_count);
static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t
*attribute_names, const char *qdr_columns[], int column_count);
+
//==================================================================================
// Interface Functions
//==================================================================================
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 61aaa48..350a717 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -37,19 +37,6 @@ static const char *address_key(qdr_address_t *addr) {
return addr && addr->hash_handle ? (const char*)
qd_hash_key_by_handle(addr->hash_handle) : NULL;
}
-static const char* qd_router_link_remote_container(qdr_link_t *link) {
- if (!link->link || !qd_link_pn(link->link))
- return "";
- return pn_connection_remote_container(
- pn_session_connection(qd_link_pn_session(link->link)));
-}
-
-static const char* qd_router_link_name(qdr_link_t *link) {
- if (!link->link || !qd_link_pn(link->link))
- return "";
- return pn_link_name(qd_link_pn(link->link));
-}
-
static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link )
{
qd_composed_field_t *body = query->body;
@@ -68,11 +55,11 @@ static void qdr_agent_write_link_CT(qdr_query_t *query,
qdr_link_t *link )
break;
case QDR_LINK_REMOTE_CONTAINER:
- qd_compose_insert_string(body,
qd_router_link_remote_container(link));
+ qd_compose_insert_null(body); // FIXME
break;
case QDR_LINK_LINK_NAME:
- qd_compose_insert_string(body, qd_router_link_name(link));
+ qd_compose_insert_null(body); // FIXME
break;
case QDR_LINK_LINK_TYPE:
@@ -126,7 +113,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t
*query, int offset)
//
// If the offset goes beyond the set of links, end the query now.
//
- if (offset >= DEQ_SIZE(core->links)) {
+ if (true /*offset >= DEQ_SIZE(core->links)*/) { // FIXME
query->more = false;
qdr_agent_enqueue_response_CT(core, query);
return;
@@ -135,7 +122,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t
*query, int offset)
//
// Run to the address at the offset.
//
- qdr_link_t *link = DEQ_HEAD(core->links);
+ qdr_link_t *link = 0; // DEQ_HEAD(core->links); FIXME
for (int i = 0; i < offset && link; i++)
link = DEQ_NEXT(link);
assert(link);
@@ -167,8 +154,8 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t
*query)
// If the address was removed in the time between this get and the
previous one,
// we need to use the saved offset, which is less efficient.
//
- if (query->next_offset < DEQ_SIZE(core->links)) {
- link = DEQ_HEAD(core->links);
+ if (false /*query->next_offset < DEQ_SIZE(core->links)*/) { // FIXME
+ link = 0; //DEQ_HEAD(core->links);
for (int i = 0; i < query->next_offset && link; i++)
link = DEQ_NEXT(link);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4d98811..bb4a416 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -26,6 +26,7 @@ static void qdr_link_second_attach_CT(qdr_core_t *core,
qdr_action_t *action, bo
static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
ALLOC_DEFINE(qdr_connection_t);
+ALLOC_DEFINE(qdr_connection_work_t);
//==================================================================================
// Internal Functions
@@ -48,6 +49,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
bool incoming, qdr_con
conn->role = role;
conn->label = label;
conn->mask_bit = -1;
+ DEQ_INIT(conn->links);
+ DEQ_INIT(conn->work_list);
+ conn->work_lock = sys_mutex();
action->args.connection.conn = conn;
qdr_action_enqueue(core, action);
@@ -71,12 +75,45 @@ void qdr_connection_set_context(qdr_connection_t *conn,
void *context)
}
-void *qdr_connection_get_context(qdr_connection_t *conn)
+void *qdr_connection_get_context(const qdr_connection_t *conn)
{
return conn ? conn->user_context : 0;
}
+void qdr_connection_process(qdr_connection_t *conn)
+{
+ qdr_connection_work_list_t work_list;
+ qdr_core_t *core = conn->core;
+
+ sys_mutex_lock(conn->work_lock);
+ DEQ_MOVE(conn->work_list, work_list);
+ sys_mutex_unlock(conn->work_lock);
+
+ qdr_connection_work_t *work = DEQ_HEAD(work_list);
+ while (work) {
+ DEQ_REMOVE_HEAD(work_list);
+
+ switch (work->work_type) {
+ case QDR_CONNECTION_WORK_FIRST_ATTACH :
+ core->first_attach_handler(core->user_context, conn, work->link,
work->source, work->target, work->flags);
+ break;
+
+ case QDR_CONNECTION_WORK_SECOND_ATTACH :
+ core->second_attach_handler(core->user_context, work->link,
work->source, work->target);
+ break;
+
+ case QDR_CONNECTION_WORK_DETACH :
+ core->detach_handler(core->user_context, work->link,
work->condition);
+ break;
+ }
+
+ free_qdr_connection_work_t(work);
+ work = DEQ_HEAD(work_list);
+ }
+}
+
+
void qdr_link_set_context(qdr_link_t *link, void *context)
{
if (link)
@@ -84,13 +121,25 @@ void qdr_link_set_context(qdr_link_t *link, void *context)
}
-void *qdr_link_get_context(qdr_link_t *link)
+void *qdr_link_get_context(const qdr_link_t *link)
{
return link ? link->user_context : 0;
}
-qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir,
pn_terminus_t *source, pn_terminus_t *target)
+qd_link_type_t qdr_link_type(const qdr_link_t *link)
+{
+ return link->link_type;
+}
+
+
+qd_direction_t qdr_link_direction(const qdr_link_t *link)
+{
+ return link->link_direction;
+}
+
+
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir,
qdr_terminus_t *source, qdr_terminus_t *target)
{
qdr_action_t *action = qdr_action(qdr_link_first_attach_CT);
qdr_link_t *link = new_qdr_link_t();
@@ -110,7 +159,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qd_direction_t dir, pn
}
-void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source,
pn_terminus_t *target)
+void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source,
qdr_terminus_t *target)
{
qdr_action_t *action = qdr_action(qdr_link_second_attach_CT);
@@ -131,10 +180,72 @@ void qdr_link_detach(qdr_link_t *link, pn_condition_t
*condition)
}
+void qdr_connection_handlers(qdr_core_t *core,
+ void *context,
+ qdr_connection_activate_t activate,
+ qdr_link_first_attach_t first_attach,
+ qdr_link_second_attach_t second_attach,
+ qdr_link_detach_t detach)
+{
+ core->user_context = context;
+ core->activate_handler = activate;
+ core->first_attach_handler = first_attach;
+ core->second_attach_handler = second_attach;
+ core->detach_handler = detach;
+}
+
+
//==================================================================================
// In-Thread Functions
//==================================================================================
+static void qdr_connection_enqueue_work_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_connection_work_t *work)
+{
+ sys_mutex_lock(conn->work_lock);
+ DEQ_INSERT_TAIL(conn->work_list, work);
+ bool notify = DEQ_SIZE(conn->work_list) == 1;
+ sys_mutex_unlock(conn->work_lock);
+
+ if (notify)
+ core->activate_handler(core->user_context, conn);
+}
+
+
+static qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qd_link_type_t link_type,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ uint32_t flags)
+{
+ //
+ // Create a new link, initiated by the router core. This will involve
issuing a first-attach outbound.
+ //
+ qdr_link_t *link = new_qdr_link_t();
+ ZERO(link);
+
+ link->core = core;
+ link->user_context = 0;
+ link->conn = conn;
+ link->link_type = link_type;
+ link->link_direction = dir;
+
+ qdr_connection_work_t *work = new_qdr_connection_work_t();
+ ZERO(work);
+ work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH;
+ work->link = link;
+ work->source = source;
+ work->target = target;
+ work->flags = flags;
+
+ qdr_connection_enqueue_work_CT(core, conn, work);
+ return link;
+}
+
+
static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action,
bool discard)
{
if (discard)
@@ -166,17 +277,16 @@ static void qdr_connection_opened_CT(qdr_core_t *core,
qdr_action_t *action, boo
if (!conn->incoming) {
//
// The connector-side of inter-router connections is responsible
for setting up the
- // inter-router links: Two (in and out) for control, two for
routed-message transfer
+ // inter-router links: Two (in and out) for control, two for
routed-message transfer.
//
- //(void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL,
QD_INCOMING, ...);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL,
QD_INCOMING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_CONTROL);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL,
QD_OUTGOING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_CONTROL);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,
QD_INCOMING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_DATA);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,
QD_OUTGOING, 0, 0, QDR_FLAGS_CAPABILITY_ROUTER_DATA);
}
}
- // If the role is INTER_ROUTER:
- // Assign this connection a neighbor mask-bit
- // If the connection is not incoming:
- // Establish a receiver and sender for router control
- // Establish a receiver and sender for inter-router message routing
+ //
// If the role is ON_DEMAND:
// Activate waypoints associated with this connection
// Activate link-route destinations associated with this connection
@@ -201,7 +311,12 @@ static void qdr_connection_closed_CT(qdr_core_t *core,
qdr_action_t *action, boo
// with the links.
//
+ //
+ // Discard items on the work list
+ //
+
DEQ_REMOVE(core->open_connections, conn);
+ sys_mutex_free(conn->work_lock);
free_qdr_connection_t(conn);
}
@@ -211,11 +326,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core,
qdr_action_t *action, boo
if (discard)
return;
- //qdr_connection_t *conn = action->args.connection.conn;
- //qdr_link_t *link = action->args.connection.link;
- //qd_direction_t dir = action->args.connection.dir;
- //pn_terminus_t *source = action->args.connection.source;
- //pn_terminus_t *target = action->args.connection.target;
+ //qdr_connection_t *conn = action->args.connection.conn;
+ //qdr_link_t *link = action->args.connection.link;
+ //qd_direction_t dir = action->args.connection.dir;
+ //qdr_terminus_t *source = action->args.connection.source;
+ //qdr_terminus_t *target = action->args.connection.target;
//
// Cases to be handled:
@@ -254,9 +369,9 @@ static void qdr_link_second_attach_CT(qdr_core_t *core,
qdr_action_t *action, bo
if (discard)
return;
- //qdr_link_t *link = action->args.connection.link;
- //pn_terminus_t *source = action->args.connection.source;
- //pn_terminus_t *target = action->args.connection.target;
+ //qdr_link_t *link = action->args.connection.link;
+ //qdr_terminus_t *source = action->args.connection.source;
+ //qdr_terminus_t *target = action->args.connection.target;
//
// Cases to be handled:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 96ae528..97ddad4 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -154,7 +154,6 @@ void qdr_core_subscribe(qdr_core_t *core, const char
*address, char aclass, char
void qdr_route_table_setup_CT(qdr_core_t *core)
{
DEQ_INIT(core->addrs);
- DEQ_INIT(core->links);
DEQ_INIT(core->routers);
core->addr_hash = qd_hash(10, 32, 0);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a747572..4e86c9b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -81,7 +81,7 @@ ALLOC_DEFINE(qdr_field_t);
qdr_field_t *qdr_field(const char *text)
{
- size_t length = strlen(text);
+ size_t length = text ? strlen(text) : 0;
size_t ilength = length;
if (length == 0)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 0347fe0..f70e97a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -70,8 +70,8 @@ struct qdr_action_t {
qdr_delivery_t *delivery;
qd_message_t *msg;
qd_direction_t dir;
- pn_terminus_t *source;
- pn_terminus_t *target;
+ qdr_terminus_t *source;
+ qdr_terminus_t *target;
pn_condition_t *condition;
} connection;
@@ -160,20 +160,17 @@ struct qdr_link_t {
qdr_core_t *core;
void *user_context;
qdr_connection_t *conn; ///< [ref] Connection that owns
this link
- int mask_bit; ///< Unique mask bit if this is
an inter-router link
qd_link_type_t link_type;
qd_direction_t link_direction;
qdr_address_t *owning_addr; ///< [ref] Address record that
owns this link
//qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns
this link
- qd_link_t *link; ///< [own] Link pointer
DEPRECATE
qdr_link_t *connected_link; ///< [ref] If this is a
link-route, reference the connected link
qdr_link_ref_t *ref; ///< Pointer to a containing
reference object
- char *target; ///< Target address for
incoming links
qd_routed_event_list_t event_fifo; ///< FIFO of outgoing
delivery/link events (no messages)
qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or
outgoing message deliveries
qd_router_delivery_list_t deliveries; ///< [own] outstanding
unsettled deliveries
- bool strip_inbound_annotations; ///<should the
dispatch specific inbound annotations be stripped at the ingress router
- bool strip_outbound_annotations; ///<should the
dispatch specific outbound annotations be stripped at the egress router
+ bool strip_inbound_annotations; ///<should the
dispatch specific inbound annotations be stripped at the ingress router
DEPRECATE/MOVE
+ bool strip_outbound_annotations; ///<should the
dispatch specific outbound annotations be stripped at the egress router
DEPRECATE/MOVE
};
ALLOC_DECLARE(qdr_link_t);
@@ -244,16 +241,36 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list,
qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+typedef enum {
+ QDR_CONNECTION_WORK_FIRST_ATTACH,
+ QDR_CONNECTION_WORK_SECOND_ATTACH,
+ QDR_CONNECTION_WORK_DETACH
+} qdr_connection_work_type_t;
+
+typedef struct qdr_connection_work_t {
+ DEQ_LINKS(struct qdr_connection_work_t);
+ qdr_connection_work_type_t work_type;
+ qdr_link_t *link;
+ qdr_terminus_t *source;
+ qdr_terminus_t *target;
+ pn_condition_t *condition;
+ uint32_t flags;
+} qdr_connection_work_t;
+
+ALLOC_DECLARE(qdr_connection_work_t);
+DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t);
+
struct qdr_connection_t {
DEQ_LINKS(qdr_connection_t);
- qdr_core_t *core;
- void *user_context;
- bool incoming;
- qdr_connection_role_t role;
- const char *label;
- int mask_bit;
-
- // TODO - Add direct linkage to waypoints, link-route destinations, and
links
+ qdr_core_t *core;
+ void *user_context;
+ bool incoming;
+ qdr_connection_role_t role;
+ const char *label;
+ int mask_bit;
+ qdr_link_list_t links;
+ qdr_connection_work_list_t work_list;
+ sys_mutex_t *work_lock;
};
ALLOC_DECLARE(qdr_connection_t);
@@ -287,6 +304,15 @@ struct qdr_core_t {
qdr_mobile_removed_t rt_mobile_removed;
qdr_link_lost_t rt_link_lost;
+ //
+ // Connection section
+ //
+ void *user_context;
+ qdr_connection_activate_t activate_handler;
+ qdr_link_first_attach_t first_attach_handler;
+ qdr_link_second_attach_t second_attach_handler;
+ qdr_link_detach_t detach_handler;
+
const char *router_area;
const char *router_id;
@@ -296,7 +322,6 @@ struct qdr_core_t {
qdr_address_t *routerma_addr;
qdr_address_t *hello_addr;
- qdr_link_list_t links;
qdr_node_list_t routers;
qd_bitmask_t *neighbor_free_mask;
qdr_node_t **routers_by_mask_bit;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
new file mode 100644
index 0000000..4623e09
--- /dev/null
+++ b/src/router_core/terminus.c
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+struct qdr_terminus_t {
+ qdr_field_t *address;
+ pn_durability_t durability;
+ pn_expiry_policy_t expiry_policy;
+ pn_seconds_t timeout;
+ bool dynamic;
+ pn_distribution_mode_t distribution_mode;
+ pn_data_t *properties;
+ pn_data_t *filter;
+ pn_data_t *outcomes;
+ pn_data_t *capabilities;
+};
+
+ALLOC_DECLARE(qdr_terminus_t);
+ALLOC_DEFINE(qdr_terminus_t);
+
+qdr_terminus_t *qdr_terminus(pn_terminus_t *pn)
+{
+ if (pn == 0)
+ return 0;
+
+ qdr_terminus_t *term = new_qdr_terminus_t();
+ ZERO(term);
+
+ term->address = qdr_field(pn_terminus_get_address(pn));
+ term->durability = pn_terminus_get_durability(pn);
+ term->expiry_policy = pn_terminus_get_expiry_policy(pn);
+ term->timeout = pn_terminus_get_timeout(pn);
+ term->dynamic = pn_terminus_is_dynamic(pn);
+ term->distribution_mode = pn_terminus_get_distribution_mode(pn);
+ term->properties = pn_data(0);
+ term->filter = pn_data(0);
+ term->outcomes = pn_data(0);
+ term->capabilities = pn_data(0);
+
+ pn_data_copy(term->properties, pn_terminus_properties(pn));
+ pn_data_copy(term->filter, pn_terminus_filter(pn));
+ pn_data_copy(term->outcomes, pn_terminus_outcomes(pn));
+ pn_data_copy(term->capabilities, pn_terminus_capabilities(pn));
+
+ return term;
+}
+
+
+void qdr_terminus_free(qdr_terminus_t *term)
+{
+ if (term == 0)
+ return;
+
+ qdr_field_free(term->address);
+ pn_data_free(term->properties);
+ pn_data_free(term->filter);
+ pn_data_free(term->outcomes);
+ pn_data_free(term->capabilities);
+
+ free_qdr_terminus_t(term);
+}
+
+
+void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to)
+{
+ if (from->address) {
+ unsigned char *addr = qd_field_iterator_copy(from->address->iterator);
+ pn_terminus_set_address(to, (char*) addr);
+ free(addr);
+ }
+
+ pn_terminus_set_durability(to, from->durability);
+ pn_terminus_set_expiry_policy(to, from->expiry_policy);
+ pn_terminus_set_timeout(to, from->timeout);
+ pn_terminus_set_dynamic(to, from->dynamic);
+ pn_terminus_set_distribution_mode(to, from->distribution_mode);
+
+ pn_data_copy(pn_terminus_properties(to), from->properties);
+ pn_data_copy(pn_terminus_filter(to), from->filter);
+ pn_data_copy(pn_terminus_outcomes(to), from->outcomes);
+ pn_data_copy(pn_terminus_capabilities(to), from->capabilities);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b37dd4a3/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 6ecaaa9..50ab94e 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -51,15 +51,6 @@ typedef enum {
} qd_router_mode_t;
ENUM_DECLARE(qd_router_mode);
-typedef enum {
- QD_LINK_ENDPOINT, ///< A link to a connected endpoint
- QD_LINK_WAYPOINT, ///< A link to a configured waypoint
- QD_LINK_CONTROL, ///< A link to a peer router for control messages
- QD_LINK_ROUTER, ///< A link to a peer router for routed messages
- QD_LINK_AREA ///< A link to a peer router in a different area (area
boundary)
-} qd_link_type_t;
-ENUM_DECLARE(qd_link_type);
-
typedef struct qd_routed_event_t {
DEQ_LINKS(struct qd_routed_event_t);
qd_router_delivery_t *delivery;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]