Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 ca0987522 -> 77281d629
DISPATCH-179 - Added delivery linkage from router_node to router_core.
Removed large swaths of deprecated code from router_node.
Removed src/router_delivery.c and src/waypoint.c from build.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0ba77e3d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0ba77e3d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0ba77e3d
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 0ba77e3dfe91e4b74fcf959d3b2155e9d9f41247
Parents: ca09875
Author: Ted Ross <[email protected]>
Authored: Wed Jan 6 14:58:20 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Jan 6 14:58:20 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/iterator.h | 9 +
include/qpid/dispatch/router_core.h | 96 +++-
include/qpid/dispatch/server.h | 5 +
src/CMakeLists.txt | 2 -
src/connection_manager.c | 18 +-
src/iterator.c | 11 +
src/router_agent.c | 78 +--
src/router_core/connections.c | 50 +-
src/router_core/route_tables.c | 4 +
src/router_core/router_core_private.h | 11 +-
src/router_core/transfer.c | 24 +-
src/router_node.c | 821 +++++------------------------
src/router_private.h | 54 --
src/waypoint.c | 4 +-
14 files changed, 316 insertions(+), 871 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index f82db1b..2c490e3 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -246,6 +246,15 @@ int qd_field_iterator_ncopy(qd_field_iterator_t *iter,
unsigned char* buffer, in
unsigned char *qd_field_iterator_copy(qd_field_iterator_t *iter);
/**
+ * Return a new iterator that is a duplicate of the original iterator,
referring
+ * to the same base data. If the input iterator pointer is NULL, the duplicate
+ * will also be NULL (i.e. no new iterator will be created).
+ * @param iter Input iterator
+ * @return Pointer to a new, identical iterator referring to the same data.
+ */
+qd_field_iterator_t *qd_field_iterator_dup(const qd_field_iterator_t *iter);
+
+/**
* Copy the iterator's view into buffer as a null terminated string,
* up to a maximum of n bytes. Cursor is advanced by the number of bytes
* copied. Useful for log messages.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h
b/include/qpid/dispatch/router_core.h
index bc4d03a..84c9274 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -33,12 +33,12 @@
*/
typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_subscription_t qdr_subscription_t;
typedef struct qdr_connection_t qdr_connection_t;
typedef struct qdr_link_t qdr_link_t;
typedef struct qdr_delivery_t qdr_delivery_t;
typedef struct qdr_terminus_t qdr_terminus_t;
typedef struct qdr_error_t qdr_error_t;
-typedef struct qdr_subscription_t qdr_subscription_t;
/**
* Allocate and start an instance of the router core module.
@@ -52,7 +52,7 @@ void qdr_core_free(qdr_core_t *core);
/**
******************************************************************************
- * Route table maintenance functions
+ * Route table maintenance functions (Router Control)
******************************************************************************
*/
void qdr_core_add_router(qdr_core_t *core, const char *address, int
router_maskbit);
@@ -92,6 +92,22 @@ qdr_subscription_t *qdr_core_subscribe(qdr_core_t
*core,
void qdr_core_unsubscribe(qdr_subscription_t *sub);
+/**
+ * qdr_send_to
+ *
+ * Send a message to a destination. This function is used only by in-process
components that
+ * create messages to be sent. For these messages, there is no inbound link
or delivery.
+ * Note also that deliveries sent through this function will be pre-settled.
+ *
+ * @param core Pointer to the core module
+ * @param msg Pointer to the message to be sent. The message will be copied
during the call
+ * and must be freed by the caller if the caller doesn't need to
hold it for later use.
+ * @param addr Null-terminated string containing the address to which the
message should be delivered.
+ * @param exclude_inprocess If true, the message will not be sent to
in-process subscribers.
+ * @param control If true, this message is to be treated as control traffic
and flow on a control link.
+ */
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool
exclude_inprocess, bool control);
+
/**
******************************************************************************
@@ -125,9 +141,16 @@ typedef enum {
* @param role The configured role of this connection
* @param label Optional label provided in the connection's configuration.
This is used to
* correlate the connection with waypoints and link-route destinations
that use the connection.
+ * @param strip_annotations_in True if configured to remove annotations on
inbound messages.
+ * @param strip_annotations_out True if configured to remove annotations on
outbound messages.
* @return Pointer to a connection object that can be used to refer to this
connection over its lifetime.
*/
-qdr_connection_t *qdr_connection_opened(qdr_core_t *core, bool incoming,
qdr_connection_role_t role, const char *label);
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
+ bool incoming,
+ qdr_connection_role_t role,
+ const char *label,
+ bool
strip_annotations_in,
+ bool
strip_annotations_out);
/**
* qdr_connection_closed
@@ -347,6 +370,44 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link);
qd_direction_t qdr_link_direction(const qdr_link_t *link);
/**
+ * qdr_link_is_anonymous
+ *
+ * Indicate whether the link is anonymous. Note that this is determined inside
+ * the core thread. In the time between first creating the link and when the
+ * core thread determines its status, a link will indicate "true" for being
anonymous.
+ * The reason for this is to be conservative. The anonymous check is an
optimization
+ * used by the caller to skip parsing the "to" field for messages on
non-anonymous links.
+ *
+ * @param link Link object
+ * @return True if the link is anonymous or the link hasn't been processed yet.
+ */
+bool qdr_link_is_anonymous(const qdr_link_t *link);
+
+/**
+ * qdr_link_is_routed
+ *
+ * Indicate whether the link is link-routed.
+ *
+ * @param link Link object
+ * @return True if the link is link-routed.
+ */
+bool qdr_link_is_routed(const qdr_link_t *link);
+
+/**
+ * qdr_link_strip_annotations_in
+ *
+ * Indicate whether the link's connection is configured to strip message
annotations on inbound messages.
+ */
+bool qdr_link_strip_annotations_in(const qdr_link_t *link);
+
+/**
+ * qdr_link_strip_annotations_oout
+ *
+ * Indicate whether the link's connection is configured to strip message
annotations on outbound messages.
+ */
+bool qdr_link_strip_annotations_out(const qdr_link_t *link);
+
+/**
* qdr_link_name
*
* Retrieve the name of the link.
@@ -397,26 +458,27 @@ void qdr_link_second_attach(qdr_link_t *link,
qdr_terminus_t *source, qdr_termin
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t
*error);
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg);
-qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg, qd_field_iterator_t *addr);
-
/**
- * qdr_send_to
+ * qdr_link_deliver
*
- * Send a message to a destination. This function is used only by in-process
components that
- * create messages to be sent. For these messages, there is no inbound link
or delivery.
+ * Deliver a message to the router core for forwarding. This function is used
in cases where
+ * the link contains all the information needed for proper message routing
(i.e. non-anonymous
+ * inbound links).
*
- * @param core Pointer to the core module
- * @param msg Pointer to the message to be sent. The message will be copied
during the call
- * can must be freed by the caller if the caller doesn't need to
hold it for later use.
- * @param exclude_inprocess If true, the message will not be sent to
in-process subscribers.
- * @param control If true, this message is to be treated as control traffic
and flow on a control link.
+ * @param link Pointer to the link over which the message arrived.
+ * @param msg Pointer to the delivered message. The sender is giving this
reference to the router
+ * core. The sender _must not_ free or otherwise use the message
after invoking this function.
+ * @return Pointer to the qdr_delivery that will track the lifecycle of this
delivery on this link.
*/
-void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool
exclude_inprocess, bool control);
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg,
qd_field_iterator_t *ingress);
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
+ qd_field_iterator_t *ingress,
qd_field_iterator_t *addr);
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t
*msg);
typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t
*conn, qdr_link_t *link,
qdr_terminus_t *source,
qdr_terminus_t *target);
-typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
+typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
+ qdr_terminus_t *source,
qdr_terminus_t *target);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link,
qdr_error_t *error);
void qdr_connection_handlers(qdr_core_t *core,
@@ -431,6 +493,8 @@ void qdr_connection_handlers(qdr_core_t
*core,
* Delivery functions
******************************************************************************
*/
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
+void *qdr_delivery_get_context(qdr_delivery_t *delivery);
void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
void qdr_delivery_update_flow(qdr_delivery_t *delivery);
void qdr_delivery_process(qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 2fb237d..be9d117 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -248,6 +248,11 @@ typedef struct qd_server_config_t {
char *port;
/**
+ * Connection label, used as a reference from other parts of the
configuration.
+ */
+ char *label;
+
+ /**
* Space-separated list of SASL mechanisms to be accepted for the
connection.
*/
char *sasl_mechanisms;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e82d805..e1d8097 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -78,13 +78,11 @@ set(qpid_dispatch_SOURCES
router_core/management_agent.c
router_core/terminus.c
router_core/transfer.c
- router_delivery.c
router_node.c
router_pynode.c
schema_enum.c
server.c
timer.c
- waypoint.c
)
if(USE_MEMORY_POOL)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index c9dc81c..d432f64 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -42,7 +42,6 @@ struct qd_config_connector_t {
bool is_connector;
DEQ_LINKS(qd_config_connector_t);
void *context;
- const char *connector_name;
qd_connector_t *connector;
qd_server_config_t configuration;
bool started;
@@ -80,6 +79,7 @@ static void qd_server_config_free(qd_server_config_t *cf)
if (!cf) return;
free(cf->host);
free(cf->port);
+ free(cf->label);
free(cf->role);
free(cf->sasl_mechanisms);
if (cf->ssl_enabled) {
@@ -101,8 +101,8 @@ static void qd_server_config_free(qd_server_config_t *cf)
*/
static void load_strip_annotations(qd_server_config_t *config, const char*
stripAnnotations)
{
- if(stripAnnotations) {
- if (strcmp(stripAnnotations, "both") == 0) {
+ if (stripAnnotations) {
+ if (strcmp(stripAnnotations, "both") == 0) {
config->strip_inbound_annotations = true;
config->strip_outbound_annotations = true;
}
@@ -141,6 +141,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd,
qd_server_config_t *conf
memset(config, 0, sizeof(*config));
config->host = qd_entity_get_string(entity, "addr"); CHECK();
config->port = qd_entity_get_string(entity, "port"); CHECK();
+ config->label = qd_entity_get_string(entity, "name"); CHECK();
config->role = qd_entity_get_string(entity, "role"); CHECK();
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize");
CHECK();
config->idle_timeout_seconds = qd_entity_get_long(entity,
"idleTimeoutSeconds"); CHECK();
@@ -207,10 +208,9 @@ qd_error_t qd_dispatch_configure_connector(qd_dispatch_t
*qd, qd_entity_t *entit
return qd_error_code();
DEQ_ITEM_INIT(cc);
if (strcmp(cc->configuration.role, "on-demand") == 0) {
- cc->connector_name = qd_entity_get_string(entity, "name");
QD_ERROR_RET();
DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
- qd_log(cm->log_source, QD_LOG_INFO, "Configured on-demand connector:
%s:%s name=%s",
- cc->configuration.host, cc->configuration.port,
cc->connector_name);
+ qd_log(cm->log_source, QD_LOG_INFO, "Configured on-demand connector:
%s:%s label=%s",
+ cc->configuration.host, cc->configuration.port,
cc->configuration.label);
} else {
DEQ_INSERT_TAIL(cm->config_connectors, cc);
qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s
role=%s",
@@ -293,7 +293,7 @@ qd_config_connector_t
*qd_connection_manager_find_on_demand(qd_dispatch_t *qd, c
qd_config_connector_t *cc =
DEQ_HEAD(qd->connection_manager->on_demand_connectors);
while (cc) {
- if (strcmp(cc->connector_name, name) == 0)
+ if (strcmp(cc->configuration.label, name) == 0)
break;
cc = DEQ_NEXT(cc);
}
@@ -319,7 +319,7 @@ void qd_connection_manager_start_on_demand(qd_dispatch_t
*qd, qd_config_connecto
{
if (cc && cc->connector == 0) {
qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Starting
on-demand connector: %s",
- cc->connector_name);
+ cc->configuration.label);
cc->connector = qd_server_connect(qd, &cc->configuration, cc);
}
}
@@ -344,7 +344,7 @@ void qd_config_connector_set_context(qd_config_connector_t
*cc, void *context)
const char *qd_config_connector_name(qd_config_connector_t *cc)
{
- return cc ? cc->connector_name : 0;
+ return cc ? cc->configuration.label : 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 686447d..2fc49f6 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -602,6 +602,17 @@ unsigned char *qd_field_iterator_copy(qd_field_iterator_t
*iter)
}
+qd_field_iterator_t *qd_field_iterator_dup(const qd_field_iterator_t *iter)
+{
+ if (iter == 0)
+ return 0;
+
+ qd_field_iterator_t *dup = new_qd_field_iterator_t();
+ *dup = *iter;
+ return dup;
+}
+
+
qd_iovec_t *qd_field_iterator_iovec(const qd_field_iterator_t *iter)
{
assert(!iter->view_prefix); // Not supported for views with a prefix
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_agent.c
----------------------------------------------------------------------
diff --git a/src/router_agent.c b/src/router_agent.c
index 2b006b9..4e02b77 100644
--- a/src/router_agent.c
+++ b/src/router_agent.c
@@ -43,83 +43,11 @@ qd_error_t qd_entity_refresh_router(qd_entity_t* entity,
void *impl) {
qd_router_t *router = qd->router;
if (qd_entity_set_string(entity, "area", router->router_area) == 0 &&
qd_entity_set_string(entity, "mode",
qd_router_mode_name(router->router_mode)) == 0 &&
- qd_entity_set_long(entity, "addrCount", DEQ_SIZE(router->addrs)) == 0
&&
- qd_entity_set_long(entity, "linkCount", DEQ_SIZE(router->links)) == 0
&&
- qd_entity_set_long(entity, "nodeCount", DEQ_SIZE(router->routers)) == 0
+ qd_entity_set_long(entity, "addrCount", 0) == 0 &&
+ qd_entity_set_long(entity, "linkCount", 0) == 0 &&
+ qd_entity_set_long(entity, "nodeCount", 0) == 0
)
return QD_ERROR_NONE;
return qd_error_code();
}
-static const char *address_key(qd_address_t *addr) {
- return addr && addr->hash_handle ? (const char*)
qd_hash_key_by_handle(addr->hash_handle) : NULL;
-}
-
-qd_error_t qd_entity_refresh_router_address(qd_entity_t* entity, void *impl) {
- qd_address_t *addr = (qd_address_t*) impl;
- uint32_t subCount = DEQ_SIZE(addr->rlinks);
- if (DEQ_SIZE(addr->lrps) > 0)
- subCount = DEQ_SIZE(addr->lrps);
- if (qd_entity_set_bool(entity, "inProcess", addr->on_message != 0) == 0 &&
- qd_entity_set_long(entity, "subscriberCount", subCount) == 0 &&
- qd_entity_set_long(entity, "remoteCount", DEQ_SIZE(addr->rnodes)) == 0
&&
- qd_entity_set_long(entity, "deliveriesIngress",
addr->deliveries_ingress) == 0 &&
- qd_entity_set_long(entity, "deliveriesEgress",
addr->deliveries_egress) == 0 &&
- qd_entity_set_long(entity, "deliveriesTransit",
addr->deliveries_transit) == 0 &&
- qd_entity_set_long(entity, "deliveriesToContainer",
addr->deliveries_to_container) == 0 &&
- qd_entity_set_long(entity, "deliveriesFromContainer",
addr->deliveries_from_container) == 0 &&
- qd_entity_set_string(entity, "key", address_key(addr))
- )
- return QD_ERROR_NONE;
- return qd_error_code();
-}
-
-/*
-static const char *qd_link_type_names[] = { "endpoint", "waypoint",
"inter-router", "inter-area" };
-ENUM_DEFINE(qd_link_type, qd_link_type_names);
-
-static const char* qd_router_link_remote_container(qd_router_link_t* link) {
- if (!link->link || !qd_link_pn(link->link))
- return "";
- return pn_connection_remote_container(
- pn_session_connection(qd_link_pn_session(link->link)));
-}
-
-static const char* qd_router_link_name(qd_router_link_t* link) {
- if (!link->link || !qd_link_pn(link->link))
- return "";
- return pn_link_name(qd_link_pn(link->link));
-}
-
-//TODO - Remove this function and the functions that it calls since this is
not used anymore.
-qd_error_t qd_entity_refresh_router_link(qd_entity_t* entity, void *impl)
-{
- qd_router_link_t *link = (qd_router_link_t*) impl;
- if (!qd_entity_set_string(entity, "linkType",
qd_link_type_name(link->link_type)) &&
- !qd_entity_set_string(entity, "linkDir", (link->link_direction ==
QD_INCOMING) ? "in": "out") &&
- !qd_entity_set_string(entity, "linkName", qd_router_link_name(link)) &&
- !qd_entity_set_string(entity, "owningAddr",
address_key(link->owning_addr)) &&
- !qd_entity_set_long(entity, "eventFifoDepth",
DEQ_SIZE(link->event_fifo)) &&
- !qd_entity_set_long(entity, "msgFifoDepth", DEQ_SIZE(link->msg_fifo))
&&
- !qd_entity_set_string(entity, "remoteContainer",
qd_router_link_remote_container(link))
- )
- return QD_ERROR_NONE;
- return qd_error_code();
-}
-*/
-void qd_router_build_node_list(qd_dispatch_t *qd, qd_composed_field_t *field)
-{
- qd_router_t *router = qd->router;
- char temp[1000];
-
- sys_mutex_lock(router->lock);
- qd_router_node_t *rnode = DEQ_HEAD(router->routers);
- while (rnode) {
- const unsigned char* addr =
qd_hash_key_by_handle(rnode->owning_addr->hash_handle);
- snprintf(temp, sizeof(temp), "amqp:/_topo/%s/%s/$management",
- router->router_area, &((char*) addr)[1]);
- qd_compose_insert_string(field, temp);
- rnode = DEQ_NEXT(rnode);
- }
- sys_mutex_unlock(router->lock);
-}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 124f4d6..01b658b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -60,21 +60,28 @@ qdr_terminus_t *qdr_terminus_router_data(void)
// Interface Functions
//==================================================================================
-qdr_connection_t *qdr_connection_opened(qdr_core_t *core, bool incoming,
qdr_connection_role_t role, const char *label)
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
+ bool incoming,
+ qdr_connection_role_t role,
+ const char *label,
+ bool
strip_annotations_in,
+ bool
strip_annotations_out)
{
qdr_action_t *action = qdr_action(qdr_connection_opened_CT,
"connection_opened");
qdr_connection_t *conn = new_qdr_connection_t();
ZERO(conn);
- conn->core = core;
- conn->user_context = 0;
- conn->incoming = incoming;
- conn->role = role;
- conn->label = label;
- conn->mask_bit = -1;
+ conn->core = core;
+ conn->user_context = 0;
+ conn->incoming = incoming;
+ conn->role = role;
+ conn->label = label;
+ conn->strip_annotations_in = strip_annotations_in;
+ conn->strip_annotations_out = strip_annotations_out;
+ conn->mask_bit = -1;
DEQ_INIT(conn->links);
DEQ_INIT(conn->work_list);
- conn->work_lock = sys_mutex();
+ conn->work_lock = sys_mutex();
action->args.connection.conn = conn;
qdr_action_enqueue(core, action);
@@ -168,6 +175,30 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
}
+bool qdr_link_is_anonymous(const qdr_link_t *link)
+{
+ return link->owning_addr == 0;
+}
+
+
+bool qdr_link_is_routed(const qdr_link_t *link)
+{
+ return link->connected_link != 0;
+}
+
+
+bool qdr_link_strip_annotations_in(const qdr_link_t *link)
+{
+ return link->strip_annotations_in;
+}
+
+
+bool qdr_link_strip_annotations_out(const qdr_link_t *link)
+{
+ return link->strip_annotations_out;
+}
+
+
const char *qdr_link_name(const qdr_link_t *link)
{
return link->name;
@@ -190,6 +221,9 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->name = (char*) malloc(strlen(name));
strcpy(link->name, name);
+ link->strip_annotations_in = conn->strip_annotations_in;
+ link->strip_annotations_out = conn->strip_annotations_out;
+
if (qdr_terminus_has_capability(local_terminus,
QD_CAPABILITY_ROUTER_CONTROL))
link->link_type = QD_LINK_CONTROL;
else if (qdr_terminus_has_capability(local_terminus,
QD_CAPABILITY_ROUTER_DATA))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 2790b39..57d9427 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -559,6 +559,10 @@ static void qdr_subscribe_CT(qdr_core_t *core,
qdr_action_t *action, bool discar
char phase = action->args.io.address_phase;
qdr_address_t *addr = 0;
+ char *astring = (char*) qd_field_iterator_copy(address->iterator);
+ qd_log(core->log, QD_LOG_INFO, "Subscribed address=%s class=%c",
astring, aclass);
+ free(astring);
+
qd_address_iterator_override_prefix(address->iterator, aclass);
if (aclass == 'M')
qd_address_iterator_set_phase(address->iterator, phase);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 0fec699..217c333 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -165,6 +165,11 @@ DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
struct qdr_delivery_t {
DEQ_LINKS(qdr_delivery_t);
+ void *context;
+ qdr_link_t *link;
+ qdr_delivery_t *peer;
+ uint64_t disposition;
+ bool settled;
};
ALLOC_DECLARE(qdr_delivery_t);
@@ -186,8 +191,8 @@ struct qdr_link_t {
qd_routed_event_list_t event_fifo; ///< FIFO of outgoing
delivery/link events (no messages)
qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or
outgoing message deliveries
qd_router_delivery_list_t deliveries; ///< [own] outstanding
unsettled deliveries
- bool strip_inbound_annotations; ///<should the
dispatch specific inbound annotations be stripped at the ingress router
DEPRECATE/MOVE
- bool strip_outbound_annotations; ///<should the
dispatch specific outbound annotations be stripped at the egress router
DEPRECATE/MOVE
+ bool strip_annotations_in;
+ bool strip_annotations_out;
};
ALLOC_DECLARE(qdr_link_t);
@@ -324,6 +329,8 @@ struct qdr_connection_t {
bool incoming;
qdr_connection_role_t role;
const char *label;
+ bool strip_annotations_in;
+ bool strip_annotations_out;
int mask_bit;
qdr_link_list_t links;
qdr_connection_work_list_t work_list;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6f22603..ce7069f 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -37,7 +37,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t
*action, bool discard)
// Interface Functions
//==================================================================================
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg)
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg,
qd_field_iterator_t *ingress)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -47,7 +47,8 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link,
pn_delivery_t *delivery, qd_m
}
-qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery,
qd_message_t *msg, qd_field_iterator_t *addr)
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
+ qd_field_iterator_t *ingress,
qd_field_iterator_t *addr)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_to_CT,
"link_deliver_to");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -57,6 +58,13 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link,
pn_delivery_t *delivery, q
}
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t
*msg)
+{
+ // TODO - Implement this. Bypass the CT?
+ return 0;
+}
+
+
void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool
exclude_inprocess, bool control)
{
qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -69,6 +77,18 @@ void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const
char *addr, bool exc
}
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
+{
+ delivery->context = context;
+}
+
+
+void *qdr_delivery_get_context(qdr_delivery_t *delivery)
+{
+ return delivery->context;
+}
+
+
//==================================================================================
// In-Thread Functions
//==================================================================================
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index cde02ef..ce4bc52 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -36,13 +36,10 @@ const char *CORE_AGENT_ADDRESS = "$management";
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
-static char *local_prefix = "_local/";
static char *direct_prefix;
static char *node_id;
ALLOC_DEFINE(qd_routed_event_t);
-ALLOC_DEFINE(qd_router_link_t);
-ALLOC_DEFINE(qd_router_node_t);
ALLOC_DEFINE(qd_router_ref_t);
ALLOC_DEFINE(qd_router_link_ref_t);
ALLOC_DEFINE(qd_router_lrp_ref_t);
@@ -67,51 +64,6 @@ const char* qd_address_logstr(qd_address_t* address) {
return (char*)qd_hash_key_by_handle(address->hash_handle);
}
-void qd_router_add_link_ref_LH(qd_router_link_ref_list_t *ref_list,
qd_router_link_t *link)
-{
- qd_router_link_ref_t *ref = new_qd_router_link_ref_t();
- DEQ_ITEM_INIT(ref);
- ref->link = link;
- link->ref = ref;
- DEQ_INSERT_TAIL(*ref_list, ref);
-}
-
-
-void qd_router_del_link_ref_LH(qd_router_link_ref_list_t *ref_list,
qd_router_link_t *link)
-{
- if (link->ref) {
- DEQ_REMOVE(*ref_list, link->ref);
- free_qd_router_link_ref_t(link->ref);
- link->ref = 0;
- }
-}
-
-
-void qd_router_add_node_ref_LH(qd_router_ref_list_t *ref_list,
qd_router_node_t *rnode)
-{
- qd_router_ref_t *ref = new_qd_router_ref_t();
- DEQ_ITEM_INIT(ref);
- ref->router = rnode;
- rnode->ref_count++;
- DEQ_INSERT_TAIL(*ref_list, ref);
-}
-
-
-void qd_router_del_node_ref_LH(qd_router_ref_list_t *ref_list,
qd_router_node_t *rnode)
-{
- qd_router_ref_t *ref = DEQ_HEAD(*ref_list);
- while (ref) {
- if (ref->router == rnode) {
- DEQ_REMOVE(*ref_list, ref);
- free_qd_router_ref_t(ref);
- rnode->ref_count--;
- break;
- }
- ref = DEQ_NEXT(ref);
- }
-}
-
-
void qd_router_add_lrp_ref_LH(qd_router_lrp_ref_list_t *ref_list, qd_lrp_t
*lrp)
{
qd_router_lrp_ref_t *ref = new_qd_router_lrp_ref_t();
@@ -138,64 +90,26 @@ void qd_router_del_lrp_ref_LH(qd_router_lrp_ref_list_t
*ref_list, qd_lrp_t *lrp)
/**
* Determine the role of a connection
*/
-static qdr_connection_role_t qd_router_connection_role(const qd_connection_t
*conn)
+static void qd_router_connection_get_config(const qd_connection_t *conn,
+ qdr_connection_role_t *role,
+ const char **label,
+ bool
*strip_annotations_in,
+ bool
*strip_annotations_out)
{
if (conn) {
const qd_server_config_t *cf = qd_connection_config(conn);
- if (cf && strcmp(cf->role, router_role) == 0)
- return QDR_ROLE_INTER_ROUTER;
- if (cf && strcmp(cf->role, on_demand_role) == 0)
- return QDR_ROLE_ON_DEMAND;
- }
-
- return QDR_ROLE_NORMAL;
-}
-
-
-void qd_router_link_free_LH(qd_router_link_t *rlink)
-{
- qd_link_t *link = rlink->link;
- if (link) {
- qd_link_set_context(link, 0);
- qd_link_free_LH(link);
- rlink->link = 0;
- }
+ if (cf && strcmp(cf->role, router_role) == 0)
+ *role = QDR_ROLE_INTER_ROUTER;
+ else if (cf && strcmp(cf->role, on_demand_role) == 0)
+ *role = QDR_ROLE_ON_DEMAND;
+ else
+ *role = QDR_ROLE_NORMAL;
- if (rlink->target)
- free(rlink->target);
-
- assert(rlink->ref == 0);
-
- qd_routed_event_t *re;
-
- re = DEQ_HEAD(rlink->event_fifo);
- while (re) {
- DEQ_REMOVE_HEAD(rlink->event_fifo);
- if (re->delivery && qd_router_delivery_fifo_exit_LH(re->delivery)) {
- qd_router_delivery_unlink_LH(re->delivery);
- }
- free_qd_routed_event_t(re);
- re = DEQ_HEAD(rlink->event_fifo);
- }
-
- re = DEQ_HEAD(rlink->msg_fifo);
- while (re) {
- DEQ_REMOVE_HEAD(rlink->msg_fifo);
- if (re->delivery)
- qd_router_delivery_fifo_exit_LH(re->delivery);
- if (re->message)
- qd_message_free(re->message);
- free_qd_routed_event_t(re);
- re = DEQ_HEAD(rlink->msg_fifo);
- }
+ *label = cf->label;
- qd_router_delivery_t *delivery = DEQ_HEAD(rlink->deliveries);
- while (delivery) {
- // this unlinks the delivery from the rlink:
- qd_router_delivery_free_LH(delivery, PN_RELEASED);
- delivery = DEQ_HEAD(rlink->deliveries);
+ *strip_annotations_in = cf->strip_inbound_annotations;
+ *strip_annotations_out = cf->strip_outbound_annotations;
}
- free_qd_router_link_t(rlink);
}
@@ -209,175 +123,11 @@ static int router_writable_conn_handler(void
*type_context, qd_connection_t *con
}
-/**
- * Outgoing Link Writable Handler
- * DEPRECATE
- */
-/*static*/ int router_writable_link_handler(void* context, qd_link_t *link)
-{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_delivery_t *delivery;
- qd_router_link_t *rlink = (qd_router_link_t*)
qd_link_get_context(link);
- pn_link_t *pn_link = qd_link_pn(link);
- uint64_t tag;
- int link_credit = pn_link_credit(pn_link);
- qd_routed_event_list_t to_send;
- qd_routed_event_list_t events;
- qd_routed_event_t *re;
- size_t offer;
- int event_count = 0;
-
- if (!rlink)
- return 0;
-
- bool drain_mode;
- bool drain_changed = qd_link_drain_changed(link, &drain_mode);
-
- DEQ_INIT(to_send);
- DEQ_INIT(events);
-
- sys_mutex_lock(router->lock);
-
- //
- // Pull the non-delivery events into a local list so they can be processed
without
- // the lock being held.
- //
- re = DEQ_HEAD(rlink->event_fifo);
- while (re) {
- DEQ_REMOVE_HEAD(rlink->event_fifo);
- DEQ_INSERT_TAIL(events, re);
- re = DEQ_HEAD(rlink->event_fifo);
- }
-
- //
- // Under lock, move available deliveries from the msg_fifo to the local
to_send
- // list. Don't move more than we have credit to send.
- //
- if (link_credit > 0) {
- tag = router->dtag;
- re = DEQ_HEAD(rlink->msg_fifo);
- while (re) {
- DEQ_REMOVE_HEAD(rlink->msg_fifo);
- DEQ_INSERT_TAIL(to_send, re);
- if (DEQ_SIZE(to_send) == link_credit)
- break;
- re = DEQ_HEAD(rlink->msg_fifo);
- }
- router->dtag += DEQ_SIZE(to_send);
- }
-
- offer = DEQ_SIZE(rlink->msg_fifo);
- sys_mutex_unlock(router->lock);
-
- //
- // Deliver all the to_send messages downrange
- //
- re = DEQ_HEAD(to_send);
- while (re) {
- DEQ_REMOVE_HEAD(to_send);
-
- //
- // Get a delivery for the send. This will be the current delivery on
the link.
- //
- tag++;
- delivery = qd_router_link_new_delivery(rlink, pn_dtag((char*) &tag,
8));
-
- //
- // Send the message
- //
- qd_message_send(re->message, link, rlink->strip_outbound_annotations);
-
- //
- // Check the delivery associated with the queued message. If it is not
- // settled, link it to the outgoing delivery for disposition/settlement
- // tracking. If it is (pre-)settled, put it on the incoming link's
event
- // queue to be locally settled. This is done to hold session credit
during
- // the time the message is in the outgoing message fifo.
- //
- sys_mutex_lock(router->lock);
- if (re->delivery) {
- if (qd_router_delivery_fifo_exit_LH(re->delivery)) {
- if (qd_router_delivery_settled(re->delivery)) {
- qd_router_link_t *peer_rlink =
qd_router_delivery_link(re->delivery);
- qd_routed_event_t *return_re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(return_re);
- return_re->delivery = re->delivery;
- return_re->message = 0;
- return_re->settle = true;
- return_re->disposition = 0;
- qd_router_delivery_fifo_enter_LH(re->delivery);
- DEQ_INSERT_TAIL(peer_rlink->event_fifo, return_re);
- qd_link_activate(peer_rlink->link);
- } else
- qd_router_delivery_link_peers_LH(re->delivery, delivery);
- }
- } else
- qd_router_delivery_free_LH(delivery, 0); // settle and free
- sys_mutex_unlock(router->lock);
-
- pn_link_advance(pn_link);
- event_count++;
-
- qd_message_free(re->message);
- free_qd_routed_event_t(re);
- re = DEQ_HEAD(to_send);
- }
-
- //
- // Process the non-delivery events.
- //
- re = DEQ_HEAD(events);
- while (re) {
- DEQ_REMOVE_HEAD(events);
-
- if (re->delivery) {
- if (re->disposition) {
- pn_delivery_update(qd_router_delivery_pn(re->delivery),
re->disposition);
- event_count++;
- }
-
- sys_mutex_lock(router->lock);
-
- bool ok = qd_router_delivery_fifo_exit_LH(re->delivery);
- if (ok && re->settle) {
- qd_router_delivery_unlink_LH(re->delivery);
- qd_router_delivery_free_LH(re->delivery, re->disposition);
- event_count++;
- }
-
- sys_mutex_unlock(router->lock);
- }
-
- free_qd_routed_event_t(re);
- re = DEQ_HEAD(events);
- }
-
- //
- // Set the offer to the number of messages remaining to be sent.
- //
- if (offer > 0)
- pn_link_offered(pn_link, offer);
- else {
- pn_link_drained(pn_link);
-
- //
- // If this link is in drain mode and it wasn't last time we came
through here, we need to
- // count this operation as a work event. This will allow the
container to process the
- // connector and send out the flow(drain=true) response to the
receiver.
- //
- if (drain_changed && drain_mode)
- event_count++;
- }
-
- return event_count;
-}
-
static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *in_ma,
qd_message_t *msg,
- int *drop,
- const char
*to_override,
- bool
strip_inbound_annotations)
+ bool *drop,
+ bool
strip_inbound_annotations)
{
qd_field_iterator_t *ingress_iter = 0;
@@ -391,15 +141,17 @@ static qd_field_iterator_t
*router_annotate_message(qd_router_t *router,
for (uint32_t idx = 0; idx < count && !done; idx++) {
qd_parsed_field_t *sub = qd_parse_sub_key(in_ma, idx);
- if (!sub) continue;
+ if (!sub)
+ continue;
qd_field_iterator_t *iter = qd_parse_raw(sub);
- if (!iter) continue;
+ if (!iter)
+ continue;
- if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TRACE)) {
+ if (qd_field_iterator_equal(iter, (unsigned char*)
QD_MA_TRACE)) {
trace = qd_parse_sub_value(in_ma, idx);
- } else if (qd_field_iterator_equal(iter, (unsigned char
*)QD_MA_INGRESS)) {
+ } else if (qd_field_iterator_equal(iter, (unsigned char*)
QD_MA_INGRESS)) {
ingress = qd_parse_sub_value(in_ma, idx);
- } else if (qd_field_iterator_equal(iter, (unsigned char
*)QD_MA_TO)) {
+ } else if (qd_field_iterator_equal(iter, (unsigned char*)
QD_MA_TO)) {
to = qd_parse_sub_value(in_ma, idx);
}
done = trace && ingress && to;
@@ -436,14 +188,9 @@ static qd_field_iterator_t
*router_annotate_message(qd_router_t *router,
//
// QD_MA_TO:
- // The supplied to override takes precedence over any existing
- // value.
+ // Preserve the existing value.
//
- if (to_override) { // takes precedence over existing value
- qd_composed_field_t *to_field = qd_compose_subfield(0);
- qd_compose_insert_string(to_field, to_override);
- qd_message_set_to_override_annotation(msg, to_field);
- } else if (to) {
+ if (to) {
qd_composed_field_t *to_field = qd_compose_subfield(0);
qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
qd_message_set_to_override_annotation(msg, to_field);
@@ -475,11 +222,10 @@ static qd_field_iterator_t
*router_annotate_message(qd_router_t *router,
*/
static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t
*pnd)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
- qd_message_t *msg;
- int valid_message = 0;
+ qd_router_t *router = (qd_router_t*) context;
+ pn_link_t *pn_link = qd_link_pn(link);
+ qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
+ qd_message_t *msg;
//
// Receive the message into a local representation. If the returned
message
@@ -508,169 +254,97 @@ static void router_rx_handler(void* context, qd_link_t
*link, pn_delivery_t *pnd
}
//
- // Handle the Link-Routing case.
+ // Handle the link-routed case
//
- sys_mutex_lock(router->lock);
- qd_router_link_t *clink = rlink->connected_link;
- if (clink) {
- //router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd),
msg);
- sys_mutex_unlock(router->lock);
+ if (qdr_link_is_routed(rlink)) {
+ // TODO - Add Link-Route Forwarding here
return;
}
//
- // Handle the Message-Routing case. Start by issuing a replacement credit.
+ // Determine if the incoming link is anonymous. If the link is addressed,
+ // there are some optimizations we can take advantage of.
//
- pn_link_flow(pn_link, 1);
+ bool anonymous_link = qdr_link_is_anonymous(rlink);
//
- // Validate the message through the Properties section so we can access
the TO field.
+ // Validate the content of the delivery as an AMQP message. This is done
partially, only
+ // to validate that we can find the fields we need to route the message.
//
- qd_message_t *in_process_copy = 0;
- qd_router_message_cb_t on_message = 0;
- void *on_message_context = 0;
-
- valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
+ // If the link is anonymous, we must validate through the message
properties to find the
+ // 'to' field. If the link is not anonymous, we don't need the 'to' field
as we will be
+ // using the address from the link target.
+ //
+ qd_message_depth_t validation_depth = anonymous_link ?
QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
+ bool valid_message = qd_message_check(msg,
validation_depth);
+ qdr_delivery_t *delivery;
if (valid_message) {
- qd_parsed_field_t *in_ma = 0;
- qd_field_iterator_t *iter = 0;
- bool free_iter = true;
- char *to_override = 0;
- bool forwarded = false;
- qd_router_delivery_t *delivery = qd_router_delivery(rlink, pnd);
-
- //
- // Only respect the delivery annotations if the message came from
another router.
- //
- if (rlink->link_type != QD_LINK_WAYPOINT)
- in_ma = qd_message_message_annotations(msg);
+ qd_parsed_field_t *in_ma =
qd_message_message_annotations(msg);
+ bool drop = false;
+ bool strip =
qdr_link_strip_annotations_in(rlink);
+ qd_field_iterator_t *ingress_iter = router_annotate_message(router,
in_ma, msg, &drop, strip);
+
+ if (drop) {
+ qd_message_free(msg);
+ return;
+ }
- //
- // If the message has delivery annotations, get the to-override field
from the annotations.
- //
- if (in_ma) {
- qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
- if (ma_to) {
- iter = qd_parse_raw(ma_to);
- free_iter = false;
+ if (anonymous_link) {
+ qd_field_iterator_t *addr_iter = 0;
+
+ //
+ // If the message has delivery annotations, get the to-override
field from the annotations.
+ //
+ if (in_ma) {
+ qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma,
QD_MA_TO);
+ if (ma_to)
+ addr_iter = qd_field_iterator_dup(qd_parse_raw(ma_to));
}
- }
- //
- // If this is a waypoint link, set the address (and to_override) to
the phased
- // address for the link.
- //
- if (!iter && rlink->waypoint) {
- iter = qd_address_iterator_string(rlink->waypoint->address,
ITER_VIEW_ADDRESS_HASH);
- qd_address_iterator_set_phase(iter, rlink->waypoint->out_phase);
+ //
+ // Still no destination address? Use the TO field from the
message properties.
+ //
+ if (!addr_iter)
+ addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+
+ if (addr_iter)
+ delivery = qdr_link_deliver_to(rlink, msg, ingress_iter,
addr_iter);
+ } else
+ delivery = qdr_link_deliver(rlink, msg, ingress_iter);
+
+ if (delivery) {
+ pn_delivery_set_context(pnd, delivery);
+ qdr_delivery_set_context(delivery, pnd);
+ } else {
+ //
+ // The message is now and will always be unroutable because there
is no address.
+ //
+ pn_delivery_update(pnd, PN_REJECTED);
+ pn_delivery_settle(pnd);
}
//
- // Still no destination address? Use the TO field from the message
properties.
- //
- if (!iter)
- iter = qd_message_field_iterator(msg, QD_FIELD_TO);
-
+ // Rules for delivering messages:
//
- // Handle the case where the TO field is absent and the incoming link
has a target
- // address. Use the target address in the lookup in lieu of a TO
address.
- // Note also that the message must then be annotated with a
TO-OVERRIDE field in
- // the delivery annotations.
+ // For addressed (non-anonymous) links:
+ // to-override must be set (done in the core?)
+ // uses qdr_link_deliver to hand over to the core
//
- // ref: https://issues.apache.org/jira/browse/DISPATCH-1
+ // For anonymous links:
+ // If there's a to-override in the annotations, use that address
+ // Or, use the 'to' field in the message properties
//
- if (!iter && rlink->target) {
- iter = qd_address_iterator_string(rlink->target, ITER_VIEW_ALL);
- to_override = rlink->target;
- }
- if (iter) {
- //
- // Note: This function is going to need to be refactored so we can
put an
- // asynchronous address lookup here. In the event there is
a translation
- // of the address (via namespace), it will have to be done
here after
- // obtaining the iterator and before doing the hash lookup.
- //
- // Note that this lookup is only done for global/mobile
class addresses.
- //
- bool is_local;
- bool is_direct;
- qd_address_t *addr = qd_router_address_lookup_LH(router, iter,
&is_local, &is_direct);
- if (free_iter)
- qd_field_iterator_free(iter);
-
- if (addr) {
- //
- // If the incoming link is an endpoint link, count this as an
ingress delivery.
- //
- if (rlink->link_type == QD_LINK_ENDPOINT)
- addr->deliveries_ingress++;
-
- //
- // TO field is valid and contains a known destination. Handle
the various
- // cases for forwarding.
- //
- // Interpret and update the delivery annotations of the
message. As a convenience,
- // this function returns the iterator to the ingress field (if
it exists). It also
- // returns a 'drop' indication if it detects that the message
will loop.
- //
- int drop = 0;
- qd_field_iterator_t *ingress_iter =
router_annotate_message(router, in_ma, msg, &drop, to_override,
rlink->strip_inbound_annotations);
-
- if (!drop) {
- //
- // Forward a copy of the message to the in-process
endpoint for
- // this address if there is one. The actual invocation of
the
- // handler will occur later after we've released the lock.
- //
- if (addr->on_message) {
- in_process_copy = qd_message_copy(msg);
- on_message = addr->on_message;
- on_message_context = addr->on_message_context;
- addr->deliveries_to_container++;
- }
-
- //
- // If the address form is local (i.e. is prefixed by
_local), don't forward
- // outside of the router process.
- //
- if (!is_local && router->router_mode !=
QD_ROUTER_MODE_ENDPOINT) {
- qd_router_forwarder_t *f = addr->forwarder;
- forwarded = f->forward(f, router, msg, delivery, addr,
ingress_iter, is_direct);
- }
- }
- }
- }
- if (!forwarded) {
- if (on_message)
- // our local in-process handler will accept it:
- qd_router_delivery_free_LH(delivery, PN_ACCEPTED);
- else {
- // no one has accepted it, so inform sender
- qd_router_delivery_set_undeliverable_LH(delivery);
- qd_router_delivery_free_LH(delivery, PN_MODIFIED);
- }
- }
+
} else {
//
- // Message is invalid. Reject the message.
+ // Message is invalid. Reject the message and don't involve the
router core.
//
pn_delivery_update(pnd, PN_REJECTED);
pn_delivery_settle(pnd);
}
-
- sys_mutex_unlock(router->lock);
- qd_message_free(msg);
-
- //
- // Invoke the in-process handler now that the lock is released.
- //
- if (on_message) {
- on_message(on_message_context, in_process_copy, rlink->mask_bit);
- qd_message_free(in_process_copy);
- }
}
@@ -679,150 +353,16 @@ static void router_rx_handler(void* context, qd_link_t
*link, pn_delivery_t *pnd
*/
static void router_disposition_handler(void* context, qd_link_t *link,
pn_delivery_t *pnd)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_delivery_t *delivery = (qd_router_delivery_t
*)pn_delivery_get_context(pnd);
- if (!delivery) return;
-
- bool changed = qd_router_delivery_disp_changed(delivery);
- uint64_t disp = qd_router_delivery_disp(delivery);
- bool settled = qd_router_delivery_settled(delivery);
-
- sys_mutex_lock(router->lock);
- qd_router_delivery_t *peer = qd_router_delivery_peer(delivery);
- if (peer) {
- //
- // The case where this delivery has a peer.
- //
- if (changed || settled) {
- qd_router_link_t *peer_link = qd_router_delivery_link(peer);
- qd_routed_event_t *re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = peer;
- re->message = 0;
- re->settle = settled;
- re->disposition = changed ? disp : 0;
-
- qd_router_delivery_fifo_enter_LH(peer);
- DEQ_INSERT_TAIL(peer_link->event_fifo, re);
- if (settled) {
- qd_router_delivery_unlink_LH(delivery);
- qd_router_delivery_free_LH(delivery, 0);
- }
-
- qd_link_activate(peer_link->link);
- }
- } else if (settled)
- qd_router_delivery_free_LH(delivery, 0);
-
- sys_mutex_unlock(router->lock);
-}
-
-
-typedef struct link_attach_t {
- qd_router_t *router;
- qd_router_link_t *peer_link;
- qd_link_t *peer_qd_link;
- char *link_name;
- qd_direction_t dir;
- qd_connection_t *conn;
- int credit;
-} link_attach_t;
-
-ALLOC_DECLARE(link_attach_t);
-ALLOC_DEFINE(link_attach_t);
+ //qd_router_t *router = (qd_router_t*) context;
+ qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd);
+ if (!delivery)
+ return;
-#define COND_NAME_LEN 127
-#define COND_DESCRIPTION_LEN 511
-
-typedef struct link_detach_t {
- qd_router_t *router;
- qd_router_link_t *rlink;
- char condition_name[COND_NAME_LEN + 1];
- char condition_description[COND_DESCRIPTION_LEN + 1];
- pn_data_t *condition_info;
-} link_detach_t;
-
-ALLOC_DECLARE(link_detach_t);
-ALLOC_DEFINE(link_detach_t);
-
-
-typedef struct link_event_t {
- qd_router_t *router;
- qd_router_link_t *rlink;
- int credit;
- bool drain;
-} link_event_t;
-
-ALLOC_DECLARE(link_event_t);
-ALLOC_DEFINE(link_event_t);
-
-
-typedef enum {
- LINK_ATTACH_FORWARDED = 1, ///< The attach was forwarded
- LINK_ATTACH_NO_MATCH = 2, ///< No link-route address was found
- LINK_ATTACH_NO_PATH = 3 ///< Link-route exists but there's no
reachable destination
-} link_attach_result_t;
-
-
-static void qd_router_flow(void *context, bool discard)
-{
- link_event_t *le = (link_event_t*) context;
-
- if (!discard) {
- qd_link_t *link = le->rlink->link;
- pn_link_t *pn_link = qd_link_pn(link);
- int delta = le->credit - pn_link_credit(pn_link);
- if (delta > 0) {
- pn_link_flow(pn_link, delta);
- qd_link_activate(link);
- }
- }
-
- free_link_event_t(le);
+ // TODO - hook into the core
}
-qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type,
qd_direction_t direction, qd_address_t *owning_addr, qd_waypoint_t *wp, int
mask_bit)
-{
- qd_router_link_t *rlink = new_qd_router_link_t();
- DEQ_ITEM_INIT(rlink);
- rlink->mask_bit = mask_bit;
- rlink->link_type = link_type;
- rlink->link_direction = direction;
- rlink->owning_addr = owning_addr;
- rlink->waypoint = wp;
- rlink->link = link;
- rlink->connected_link = 0;
- rlink->ref = 0;
- rlink->target = 0;
- rlink->strip_inbound_annotations = false;
- rlink->strip_outbound_annotations = false;
- DEQ_INIT(rlink->event_fifo);
- DEQ_INIT(rlink->msg_fifo);
- DEQ_INIT(rlink->deliveries);
-
- //Get the configuration via the connection's listener.
- qd_connection_t *connection = qd_link_connection(link);
- if (connection) {
- const qd_server_config_t *config = connection->listener ?
- connection->listener->config : connection->connector->config;
-
- if (config) {
- //strip_inbound_annotations and strip_outbound_annotations don't
apply to inter router links.
- if (rlink->link_type != QD_LINK_ROUTER) {
- if (rlink->link_direction == QD_INCOMING) {
- rlink->strip_inbound_annotations =
config->strip_inbound_annotations;
- } else {
- rlink->strip_outbound_annotations =
config->strip_outbound_annotations;
- }
- }
- }
- }
-
- return rlink;
-}
-
/**
* New Incoming Link Handler
*/
@@ -865,7 +405,9 @@ static int router_outgoing_link_handler(void* context,
qd_link_t *link)
static int router_link_attach_handler(void* context, qd_link_t *link)
{
qdr_link_t *qlink = (qdr_link_t*) qd_link_get_context(link);
- qdr_link_second_attach(qlink, qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)));
+ qdr_link_second_attach(qlink,
+ qdr_terminus(qd_link_remote_source(link)),
+ qdr_terminus(qd_link_remote_target(link)));
return 0;
}
@@ -876,42 +418,12 @@ static int router_link_attach_handler(void* context,
qd_link_t *link)
*/
static int router_link_flow_handler(void* context, qd_link_t *link)
{
- qd_router_t *router = (qd_router_t*) context;
- qd_router_link_t *rlink = (qd_router_link_t*)
qd_link_get_context(link);
- pn_link_t *pn_link = qd_link_pn(link);
+ //qd_router_t *router = (qd_router_t*) context;
+ qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
if (!rlink)
return 0;
- sys_mutex_lock(router->lock);
- qd_router_link_t *peer_rlink = rlink->connected_link;
-
- if (peer_rlink) {
- qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
- if (out_conn) {
- if (rlink->link_direction == QD_OUTGOING) {
- //
- // Outgoing link handling
- //
- int credit = pn_link_remote_credit(pn_link) -
DEQ_SIZE(rlink->msg_fifo);
- if (credit > 0) {
- link_event_t *le = new_link_event_t();
- memset(le, 0, sizeof(link_event_t));
- le->router = router;
- le->rlink = peer_rlink;
- le->credit = credit;
- le->drain = false;
- qd_connection_invoke_deferred(out_conn, qd_router_flow,
le);
- }
- } else {
- //
- // Incoming link handling
- //
- }
- }
- }
-
- sys_mutex_unlock(router->lock);
return 0;
}
@@ -935,28 +447,35 @@ static int router_link_detach_handler(void* context,
qd_link_t *link, qd_detach_
}
-static int router_inbound_opened_handler(void *type_context, qd_connection_t
*conn, void *context)
+static void router_opened_handler(qd_router_t *router, qd_connection_t *conn,
bool inbound)
{
- qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core,
true, role, 0); // TODO - get label
+ qdr_connection_role_t role;
+ bool strip_annotations_in;
+ bool strip_annotations_out;
+ const char *label;
+
+ qd_router_connection_get_config(conn, &role, &label,
&strip_annotations_in, &strip_annotations_out);
+
+ qdr_connection_t *qdrc = qdr_connection_opened(router->router_core,
inbound, role, label,
+ strip_annotations_in,
strip_annotations_out);
qd_connection_set_context(conn, qdrc);
qdr_connection_set_context(qdrc, conn);
+}
+
+static int router_inbound_opened_handler(void *type_context, qd_connection_t
*conn, void *context)
+{
+ qd_router_t *router = (qd_router_t*) type_context;
+ router_opened_handler(router, conn, true);
return 0;
}
static int router_outbound_opened_handler(void *type_context, qd_connection_t
*conn, void *context)
{
- qd_router_t *router = (qd_router_t*) type_context;
- qdr_connection_role_t role = qd_router_connection_role(conn);
- qdr_connection_t *qdrc = qdr_connection_opened(router->router_core,
false, role, 0); // TODO - get label
-
- qd_connection_set_context(conn, qdrc);
- qdr_connection_set_context(qdrc, conn);
-
+ qd_router_t *router = (qd_router_t*) type_context;
+ router_opened_handler(router, conn, false);
return 0;
}
@@ -1035,8 +554,6 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t
mode, const char *are
DEQ_INIT(router->addrs);
router->addr_hash = qd_hash(10, 32, 0);
- DEQ_INIT(router->links);
- DEQ_INIT(router->routers);
DEQ_INIT(router->lrp_containers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(qd_router_link_t,
qd_bitmask_width());
@@ -1200,101 +717,3 @@ qdr_core_t *qd_router_core(qd_dispatch_t *qd)
return qd->router->router_core;
}
-
-qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
- qd_field_iterator_t *addr_iter,
- bool *is_local, bool *is_direct)
-{
- qd_address_t *addr = 0;
- qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
- qd_hash_retrieve(router->addr_hash, addr_iter, (void*) &addr);
- qd_address_iterator_reset_view(addr_iter, ITER_VIEW_NO_HOST);
- *is_local = (bool) qd_field_iterator_prefix(addr_iter, local_prefix);
- *is_direct = (bool) qd_field_iterator_prefix(addr_iter, direct_prefix);
- return addr;
-}
-
-
-void qd_router_send(qd_dispatch_t *qd,
- qd_field_iterator_t *address,
- qd_message_t *msg)
-{
-
- qd_router_t *router = qd->router;
- qd_address_t *addr;
-
- qd_address_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
- sys_mutex_lock(router->lock);
- qd_hash_retrieve(router->addr_hash, address, (void*) &addr);
-
- if (addr) {
- //
- // Forward to all of the local links receiving this address.
- //
- addr->deliveries_from_container++;
- qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
- while (dest_link_ref) {
- qd_routed_event_t *re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = qd_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
-
- qd_link_activate(dest_link_ref->link->link);
- addr->deliveries_egress++;
-
- dest_link_ref = DEQ_NEXT(dest_link_ref);
- }
-
- //
- // Forward to the next-hops for remote destinations.
- //
- qd_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- qd_router_link_t *dest_link;
- qd_bitmask_t *link_set = qd_bitmask(0);
-
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link)
- qd_bitmask_set_bit(link_set, dest_link->mask_bit);
- dest_node_ref = DEQ_NEXT(dest_node_ref);
- }
-
- int link_bit;
- while (qd_bitmask_first_set(link_set, &link_bit)) {
- qd_bitmask_clear_bit(link_set, link_bit);
- dest_link = router->out_links_by_mask_bit[link_bit];
- if (dest_link) {
- qd_routed_event_t *re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = qd_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
- qd_link_activate(dest_link->link);
- addr->deliveries_transit++;
- }
- }
-
- qd_bitmask_free(link_set);
- }
- sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
-}
-
-
-void qd_router_send2(qd_dispatch_t *qd,
- const char *address,
- qd_message_t *msg)
-{
- if (address && msg) {
- qd_field_iterator_t *iter = qd_address_iterator_string(address,
ITER_VIEW_ADDRESS_HASH);
- qd_router_send(qd, iter, msg);
- qd_field_iterator_free(iter);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 4bd0467..f1e9749 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -68,27 +68,6 @@ DEQ_DECLARE(qd_routed_event_t, qd_routed_event_list_t);
DEQ_DECLARE(qd_router_delivery_t, qd_router_delivery_list_t);
-struct qd_router_link_t {
- DEQ_LINKS(qd_router_link_t);
- int mask_bit; ///< Unique mask bit if this is
an inter-router link
- qd_link_type_t link_type;
- qd_direction_t link_direction;
- qd_address_t *owning_addr; ///< [ref] Address record that
owns this link
- qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns
this link
- qd_link_t *link; ///< [own] Link pointer
- qd_router_link_t *connected_link; ///< [ref] If this is a
link-route, reference the connected link
- qd_router_link_ref_t *ref; ///< Pointer to a containing
reference object
- char *target; ///< Target address for
incoming links
- qd_routed_event_list_t event_fifo; ///< FIFO of outgoing
delivery/link events (no messages)
- qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or
outgoing message deliveries
- qd_router_delivery_list_t deliveries; ///< [own] outstanding
unsettled deliveries
- bool strip_inbound_annotations; ///<should the
dispatch specific inbound annotations be stripped at the ingress router
- bool strip_outbound_annotations; ///<should the
dispatch specific outbound annotations be stripped at the egress router
-};
-
-ALLOC_DECLARE(qd_router_link_t);
-DEQ_DECLARE(qd_router_link_t, qd_router_link_list_t);
-void qd_router_link_free_LH(qd_router_link_t *);
struct qd_router_node_t {
DEQ_LINKS(qd_router_node_t);
@@ -252,8 +231,6 @@ struct qd_router_t {
qd_address_t *routerma_addr;
qd_address_t *hello_addr;
- qd_router_link_list_t links;
- qd_router_node_list_t routers;
qd_lrp_container_list_t lrp_containers;
qd_router_link_t **out_links_by_mask_bit;
qd_router_node_t **routers_by_mask_bit;
@@ -283,35 +260,4 @@ qd_address_t *qd_router_address_lookup_LH(qd_router_t
*router,
qd_field_iterator_t *addr_iter,
bool *is_local, bool *is_direct);
-/**
- * Important: qd_router_link_new_delivery must never be called twice in a row
- * without an intervening pn_link_advance. The Disatch architecture
- * provides a hook for discovering when an outgoing link is writable
- * and has credit. When a link is writable, a delivery is
- * allocated, written, and advanced in one operation. If a backlog
- * of pending deliveries is created, an assertion will be thrown.
- */
-qd_router_delivery_t *qd_router_link_new_delivery(qd_router_link_t *link,
pn_delivery_tag_t tag);
-qd_router_delivery_t *qd_router_delivery(qd_router_link_t *link, pn_delivery_t
*pnd);
-void qd_router_delivery_set_undeliverable_LH(qd_router_delivery_t *delivery);
-void qd_router_delivery_free_LH(qd_router_delivery_t *delivery, uint64_t
final_disposition);
-void qd_router_delivery_link_peers_LH(qd_router_delivery_t *left,
qd_router_delivery_t *right);
-void qd_router_delivery_unlink_LH(qd_router_delivery_t *delivery);
-void qd_router_delivery_fifo_enter_LH(qd_router_delivery_t *delivery);
-bool qd_router_delivery_fifo_exit_LH(qd_router_delivery_t *delivery);
-qd_router_delivery_t *qd_router_delivery_peer(qd_router_delivery_t *delivery);
-void qd_router_delivery_set_context(qd_router_delivery_t *delivery, void
*context);
-void *qd_router_delivery_context(qd_router_delivery_t *delivery);
-pn_delivery_t *qd_router_delivery_pn(qd_router_delivery_t *delivery);
-void qd_router_delivery_settle(qd_router_delivery_t *delivery);
-bool qd_router_delivery_settled(qd_router_delivery_t *delivery);
-bool qd_router_delivery_disp_changed(qd_router_delivery_t *delivery);
-uint64_t qd_router_delivery_disp(qd_router_delivery_t *delivery);
-qd_router_link_t *qd_router_delivery_link(qd_router_delivery_t *delivery);
-
-/**
- * Instanciates, initializes and returns a pointer to qd_router_link_t.
- */
-qd_router_link_t* qd_router_link(qd_link_t *link, qd_link_type_t link_type,
qd_direction_t direction, qd_address_t *owning_addr, qd_waypoint_t *wp, int
mask_bit);
-
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0ba77e3d/src/waypoint.c
----------------------------------------------------------------------
diff --git a/src/waypoint.c b/src/waypoint.c
index fb97311..bfff845 100644
--- a/src/waypoint.c
+++ b/src/waypoint.c
@@ -85,7 +85,7 @@ static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd,
qd_waypoint_t *wp)
wp->out_link = qd_link(router->node, wp->connection, QD_OUTGOING,
wp->address);
pn_terminus_set_address(qd_link_target(wp->out_link), wp->address);
- qd_router_link_t *rlink = qd_router_link(wp->out_link,
QD_LINK_WAYPOINT, QD_OUTGOING, addr, wp, 0);
+ qd_router_link_t *rlink = 0; //qd_router_link(wp->out_link,
QD_LINK_WAYPOINT, QD_OUTGOING, addr, wp, 0);
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
@@ -148,7 +148,7 @@ static void qd_waypoint_visit_source_LH(qd_dispatch_t *qd,
qd_waypoint_t *wp)
wp->in_link = qd_link(router->node, wp->connection, QD_INCOMING,
wp->address);
pn_terminus_set_address(qd_link_source(wp->in_link), wp->address);
- qd_router_link_t *rlink = qd_router_link(wp->in_link,
QD_LINK_WAYPOINT, QD_INCOMING, addr, wp, 0);
+ qd_router_link_t *rlink = 0; //qd_router_link(wp->in_link,
QD_LINK_WAYPOINT, QD_INCOMING, addr, wp, 0);
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]