Author: tross
Date: Fri Apr 4 20:21:18 2014
New Revision: 1584887
URL: http://svn.apache.org/r1584887
Log:
DISPATCH-34 - Added configurable waypoints, integrated them with on-demand
connectors,
and validated broker integration.
Added:
qpid/dispatch/trunk/src/waypoint.c (with props)
qpid/dispatch/trunk/src/waypoint_private.h (with props)
Modified:
qpid/dispatch/trunk/CMakeLists.txt
qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
qpid/dispatch/trunk/router/src/main.c
qpid/dispatch/trunk/src/connection_manager.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/dispatch.c
qpid/dispatch/trunk/src/router_agent.c
qpid/dispatch/trunk/src/router_config.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
qpid/dispatch/trunk/tools/qdstat.in
Modified: qpid/dispatch/trunk/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Fri Apr 4 20:21:18 2014
@@ -131,6 +131,7 @@ set(server_SOURCES
src/router_pynode.c
src/server.c
src/timer.c
+ src/waypoint.c
src/work_queue.c
)
Modified: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h Fri Apr 4
20:21:18 2014
@@ -60,13 +60,13 @@ void qd_connection_manager_start(qd_disp
/**
- * \brief Given a connector-tag, find and return a pointer to the on-demand
connector.
+ * \brief Given a connector-name, find and return a pointer to the on-demand
connector.
*
* @param qd The dispatch handle returned by qd_dispatch.
- * @param tag The tag that uniquely identifies the on-demand connector.
- * @return The matching on-demand connector or NULL if the tag is not found.
+ * @param name The name that uniquely identifies the on-demand connector.
+ * @return The matching on-demand connector or NULL if the name is not found.
*/
-qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *tag);
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *name);
/**
@@ -74,7 +74,7 @@ qd_config_connector_t *qd_connection_man
*
* @param od The pointer to an on-demand connector returned by
qd_connections_find_on_demand.
*/
-void qd_connection_manager_start_on_demand(qd_config_connector_t *od);
+void qd_connection_manager_start_on_demand(qd_dispatch_t *qd,
qd_config_connector_t *cc);
/**
@@ -82,6 +82,27 @@ void qd_connection_manager_start_on_dema
*
* @param od The pointer to an on-demand connector returned by
qd_connections_find_on_demand.
*/
-void qd_connection_manager_stop_on_demand(qd_config_connector_t *od);
+void qd_connection_manager_stop_on_demand(qd_dispatch_t *qd,
qd_config_connector_t *cc);
+
+
+/**
+ * \brief Get the user context for a configured connector.
+ *
+ * @param cc Connector handle returned by qd_connection_manager_find_on_demand
+ * @return User context for the configured connector.
+ */
+void *qd_config_connector_context(qd_config_connector_t *cc);
+
+
+/**
+ * \brief Set the user context for a configured connector.
+ *
+ * @param cc Connector handle returned by qd_connection_manager_find_on_demand
+ * @param context User context to be stored with the configured connector
+ */
+void qd_config_connector_set_context(qd_config_connector_t *cc, void *context);
+
+
+const char *qd_config_connector_name(qd_config_connector_t *cc);
#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Fri Apr 4 20:21:18
2014
@@ -59,7 +59,7 @@ typedef void (*qd_container_delivery_han
typedef int (*qd_container_link_handler_t) (void *node_context,
qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (void *node_context,
qd_link_t *link, int closed);
typedef void (*qd_container_node_handler_t) (void *type_context,
qd_node_t *node);
-typedef void (*qd_container_conn_handler_t) (void *type_context,
qd_connection_t *conn);
+typedef void (*qd_container_conn_handler_t) (void *type_context,
qd_connection_t *conn, void *context);
typedef struct {
char *type_name;
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py Fri Apr
4 20:21:18 2014
@@ -58,7 +58,7 @@ config_schema = {
'addr' : (str, 0, 'M', None, None),
'port' : (str, 1, 'M', None, None),
'label' : (str, None, '', None, None),
- 'role' : (str, None, '', 'normal', ['normal',
'inter-router', 'on-demand']),
+ 'role' : (str, None, '', 'normal', ['normal',
'inter-router']),
'sasl-mechanisms' : (str, None, 'M', None, None),
'ssl-profile' : (str, None, 'E', None, None),
'require-peer-auth' : (bool, None, '', True, None),
@@ -69,8 +69,9 @@ config_schema = {
'connector' : (False, {
'addr' : (str, 0, 'M', None, None),
'port' : (str, 1, 'M', None, None),
+ 'name' : (str, None, '', None, None),
'label' : (str, None, '', None, None),
- 'role' : (str, None, '', 'normal', ['normal',
'inter-router']),
+ 'role' : (str, None, '', 'normal', ['normal',
'inter-router', 'on-demand']),
'sasl-mechanisms' : (str, None, 'M', None, None),
'ssl-profile' : (str, None, 'E', None, None),
'allow-redirect' : (bool, None, '', True, None),
@@ -101,7 +102,7 @@ def validate_roles(config):
count = config.item_count(item)
for idx in range(count):
role = config.value_string(item, idx, 'role')
- if role != 'normal':
+ if role == 'inter-router':
addr = config.value_string(item, idx, 'addr')
port = config.value_string(item, idx, 'port')
raise Exception("Role '%s' for %s %s:%s only permitted with
'interior' mode" %
Modified: qpid/dispatch/trunk/router/src/main.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/router/src/main.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/router/src/main.c (original)
+++ qpid/dispatch/trunk/router/src/main.c Fri Apr 4 20:21:18 2014
@@ -37,8 +37,8 @@ static const char *app_config =
" 'bias' : (str, None, '', 'closest', ['closest', 'spread'])})\n"
"config_schema['waypoint'] = (False, {\n"
" 'name' : (str, 0, 'M', None, None),\n"
- " 'in-phase' : (int, None, 'M', None, None),\n"
- " 'out-phase' : (int, None, 'M', None, None),\n"
+ " 'in-phase' : (int, None, '', -1, None),\n"
+ " 'out-phase' : (int, None, '', -1, None),\n"
" 'connector' : (str, None, 'M', None, None)})\n";
Modified: qpid/dispatch/trunk/src/connection_manager.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Fri Apr 4 20:21:18 2014
@@ -39,9 +39,10 @@ DEQ_DECLARE(qd_config_listener_t, qd_con
struct qd_config_connector_t {
DEQ_LINKS(qd_config_connector_t);
+ void *context;
+ const char *connector_name;
qd_connector_t *connector;
qd_server_config_t configuration;
- const char *tag;
bool started;
};
@@ -123,17 +124,23 @@ static void configure_connectors(qd_disp
count = qd_config_item_count(qd, CONF_CONNECTOR);
for (int i = 0; i < count; i++) {
qd_config_connector_t *cc = NEW(qd_config_connector_t);
- cc->connector = 0;
- cc->tag = 0;
- cc->started = false;
+ cc->context = 0;
+ cc->connector = 0;
+ cc->connector_name = 0;
+ cc->started = false;
load_server_config(qd, &cc->configuration, CONF_CONNECTOR, i);
DEQ_ITEM_INIT(cc);
if (strcmp(cc->configuration.role, "on-demand") == 0) {
+ cc->connector_name =
+ qd_config_item_value_string(qd, CONF_CONNECTOR, i, "name");
DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
- } else
+ qd_log(cm->log_source, QD_LOG_INFO, "Configured on-demand
connector: %s:%s name=%s",
+ cc->configuration.host, cc->configuration.port,
cc->connector_name);
+ } else {
DEQ_INSERT_TAIL(cm->config_connectors, cc);
- qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s
role=%s",
- cc->configuration.host, cc->configuration.port,
cc->configuration.role);
+ qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s
role=%s",
+ cc->configuration.host, cc->configuration.port,
cc->configuration.role);
+ }
}
}
@@ -185,21 +192,32 @@ void qd_connection_manager_start(qd_disp
}
-qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *tag)
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *name)
{
- return 0;
+ qd_config_connector_t *cc =
DEQ_HEAD(qd->connection_manager->on_demand_connectors);
+
+ while (cc) {
+ if (strcmp(cc->connector_name, name) == 0)
+ break;
+ cc = DEQ_NEXT(cc);
+ }
+
+ return cc;
}
-void qd_connection_manager_start_on_demand(qd_config_connector_t *od)
+void qd_connection_manager_start_on_demand(qd_dispatch_t *qd,
qd_config_connector_t *cc)
{
+ if (cc && cc->connector == 0)
+ cc->connector = qd_server_connect(qd, &cc->configuration, cc);
}
-void qd_connection_manager_stop_on_demand(qd_config_connector_t *od)
+void qd_connection_manager_stop_on_demand(qd_dispatch_t *qd,
qd_config_connector_t *cc)
{
}
+
static void server_schema_handler(void *context, void *cor)
{
qd_agent_value_string(cor, 0, "state");
@@ -269,3 +287,23 @@ void qd_connection_manager_setup_agent(q
{
qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection", qd,
server_schema_handler, server_query_handler);
}
+
+
+void *qd_config_connector_context(qd_config_connector_t *cc)
+{
+ return cc ? cc->context : 0;
+}
+
+
+void qd_config_connector_set_context(qd_config_connector_t *cc, void *context)
+{
+ if (cc)
+ cc->context = context;
+}
+
+
+const char *qd_config_connector_name(qd_config_connector_t *cc)
+{
+ return cc ? cc->connector_name : 0;
+}
+
Modified: qpid/dispatch/trunk/src/container.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Apr 4 20:21:18 2014
@@ -379,7 +379,7 @@ static int process_handler(qd_container_
}
-static void open_handler(qd_container_t *container, qd_connection_t *conn,
qd_direction_t dir)
+static void open_handler(qd_container_t *container, qd_connection_t *conn,
qd_direction_t dir, void *context)
{
const qd_node_type_t *nt;
@@ -398,10 +398,10 @@ static void open_handler(qd_container_t
nt = nt_item->ntype;
if (dir == QD_INCOMING) {
if (nt->inbound_conn_open_handler)
- nt->inbound_conn_open_handler(nt->type_context, conn);
+ nt->inbound_conn_open_handler(nt->type_context, conn, context);
} else {
if (nt->outbound_conn_open_handler)
- nt->outbound_conn_open_handler(nt->type_context, conn);
+ nt->outbound_conn_open_handler(nt->type_context, conn,
context);
}
sys_mutex_lock(container->lock);
@@ -417,8 +417,8 @@ static int handler(void *handler_context
pn_connection_t *conn = qd_connection_pn(qd_conn);
switch (event) {
- case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn,
QD_INCOMING); break;
- case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn,
QD_OUTGOING); break;
+ case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn,
QD_INCOMING, conn_context); break;
+ case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn,
QD_OUTGOING, conn_context); break;
case QD_CONN_EVENT_CLOSE: return close_handler(conn_context,
conn);
case QD_CONN_EVENT_PROCESS: return process_handler(container,
conn_context, conn);
}
Modified: qpid/dispatch/trunk/src/dispatch.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Fri Apr 4 20:21:18 2014
@@ -25,6 +25,7 @@
#include "alloc_private.h"
#include "log_private.h"
#include "router_private.h"
+#include "waypoint_private.h"
/**
* Private Function Prototypes
@@ -157,5 +158,6 @@ void qd_dispatch_post_configure_connecti
{
qd_connection_manager_configure(qd);
qd_connection_manager_start(qd);
+ qd_waypoint_activate_all(qd);
}
Modified: qpid/dispatch/trunk/src/router_agent.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_agent.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_agent.c (original)
+++ qpid/dispatch/trunk/src/router_agent.c Fri Apr 4 20:21:18 2014
@@ -127,6 +127,7 @@ static void qd_router_query_link(qd_rout
qd_agent_value_uint(cor, "index", link->mask_bit);
switch (link->link_type) {
case QD_LINK_ENDPOINT: link_type = "endpoint"; break;
+ case QD_LINK_WAYPOINT: link_type = "waypoint"; break;
case QD_LINK_ROUTER: link_type = "inter-router"; break;
case QD_LINK_AREA: link_type = "inter-area"; break;
}
Modified: qpid/dispatch/trunk/src/router_config.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Fri Apr 4 20:21:18 2014
@@ -110,28 +110,26 @@ static void qd_router_configure_waypoint
const char *name = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "name");
int in_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "in-phase");
int out_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "out-phase");
- //const char *connector = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "connector");
+ const char *connector = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "connector");
- if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
+ if (in_phase > 9 || out_phase > 9) {
qd_log(router->log_source, QD_LOG_ERROR, "Phases for waypoint '%s'
must be between 0 and 9. Ignoring", name);
continue;
}
- qd_config_waypoint_t *waypoint = NEW(qd_config_waypoint_t);
+ qd_waypoint_t *waypoint = NEW(qd_waypoint_t);
+ memset(waypoint, 0, sizeof(qd_waypoint_t));
DEQ_ITEM_INIT(waypoint);
- waypoint->name = (char*) malloc(strlen(name) + 1);
- strcpy(waypoint->name, name);
- waypoint->in_phase = (char) in_phase + '0';
- waypoint->out_phase = (char) out_phase + '0';
- waypoint->connector = 0;
- waypoint->in_link = 0;
- waypoint->out_link = 0;
+ waypoint->name = name;
+ waypoint->in_phase = in_phase >= 0 ? (char) in_phase + '0' :
'\0';
+ waypoint->out_phase = out_phase >= 0 ? (char) out_phase + '0' :
'\0';
+ waypoint->connector_name = connector;
//
// TODO - Look up connector
//
- DEQ_INSERT_TAIL(router->config_waypoints, waypoint);
+ DEQ_INSERT_TAIL(router->waypoints, waypoint);
qd_log(router->log_source, QD_LOG_INFO, "Configured Waypoint: name=%s
in_phase=%d out_phase=%d",
name, in_phase, out_phase);
Modified: qpid/dispatch/trunk/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Apr 4 20:21:18 2014
@@ -25,6 +25,7 @@
#include <qpid/dispatch.h>
#include "dispatch_private.h"
#include "router_private.h"
+#include "waypoint_private.h"
static char *router_role = "inter-router";
static char *on_demand_role = "on-demand";
@@ -122,7 +123,9 @@ void qd_router_check_addr(qd_router_t *r
//
// If the address has no handlers or destinations, it should be deleted.
//
- if (addr->handler == 0 && DEQ_SIZE(addr->rlinks) == 0 &&
DEQ_SIZE(addr->rnodes) == 0)
+ if (addr->handler == 0 &&
+ DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
+ !addr->waypoint)
to_delete = 1;
//
@@ -435,7 +438,7 @@ static qd_field_iterator_t *router_annot
qd_parsed_field_t *ingress = 0;
if (in_da) {
- trace = qd_parse_value_by_key(in_da, QD_DA_TRACE);
+ trace = qd_parse_value_by_key(in_da, QD_DA_TRACE);
ingress = qd_parse_value_by_key(in_da, QD_DA_INGRESS);
}
@@ -704,7 +707,7 @@ static void router_rx_handler(void* cont
valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
if (valid_message) {
- qd_parsed_field_t *in_da = qd_message_delivery_annotations(msg);
+ qd_parsed_field_t *in_da = 0;
qd_field_iterator_t *iter = 0;
bool free_iter = true;
qd_address_t *addr;
@@ -712,6 +715,12 @@ static void router_rx_handler(void* cont
char *to_override = 0;
//
+ // Only respect the delivery annotations if the message came from
another router.
+ //
+ if (rlink->link_type != QD_LINK_WAYPOINT)
+ in_da = qd_message_delivery_annotations(msg);
+
+ //
// If the message has delivery annotations, get the to-override field
from the annotations.
//
if (in_da) {
@@ -723,8 +732,16 @@ static void router_rx_handler(void* cont
}
//
- // If there was no to-override field, use the TO field from the
- // message properties.
+ // If this is a waypoint link, set the address (and to_override) to
the phased
+ // address for the link.
+ //
+ if (!iter && rlink->waypoint) {
+ iter = qd_field_iterator_string(rlink->waypoint->name,
ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, rlink->waypoint->out_phase);
+ }
+
+ //
+ // Still no destination address? Use the TO field from the message
properties.
//
if (!iter)
iter = qd_message_field_iterator(msg, QD_FIELD_TO);
@@ -938,6 +955,7 @@ static int router_incoming_link_handler(
rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
rlink->link_direction = QD_INCOMING;
rlink->owning_addr = 0;
+ rlink->waypoint = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -983,9 +1001,10 @@ static int router_outgoing_link_handler(
int is_dynamic = pn_terminus_is_dynamic(qd_link_remote_source(link));
int is_router =
qd_router_terminus_is_router(qd_link_remote_target(link));
int propagate = 0;
- qd_field_iterator_t *iter = 0;
+ qd_field_iterator_t *iter = 0;
char phase = '0';
qd_address_semantics_t semantics;
+ qd_address_t *addr = 0;
if (is_router &&
!qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING, "Outgoing link claims
router capability but is not on an inter-router connection");
@@ -1029,6 +1048,7 @@ static int router_outgoing_link_handler(
rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
rlink->link_direction = QD_OUTGOING;
rlink->owning_addr = 0;
+ rlink->waypoint = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -1069,8 +1089,7 @@ static int router_outgoing_link_handler(
// assign it an ephemeral and routable address. If it has a
non-dynamic
// address, that address needs to be set up in the address list.
//
- char temp_addr[1000]; // FIXME
- qd_address_t *addr;
+ char temp_addr[1000]; // FIXME
if (is_dynamic) {
qd_router_generate_temp_addr(router, temp_addr, 1000);
@@ -1104,6 +1123,14 @@ static int router_outgoing_link_handler(
}
DEQ_INSERT_TAIL(router->links, rlink);
+
+ //
+ // If an interesting change has occurred with this address and it has an
associated waypoint,
+ // notify the waypoint module so it can react appropriately.
+ //
+ if (propagate && addr->waypoint)
+ qd_waypoint_address_updated_LH(router->qd, addr);
+
sys_mutex_unlock(router->lock);
if (propagate)
@@ -1191,12 +1218,12 @@ static int router_link_detach_handler(vo
}
-static void router_inbound_open_handler(void *type_context, qd_connection_t
*conn)
+static void router_inbound_open_handler(void *type_context, qd_connection_t
*conn, void *context)
{
}
-static void router_outbound_open_handler(void *type_context, qd_connection_t
*conn)
+static void router_outbound_open_handler(void *type_context, qd_connection_t
*conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
@@ -1205,6 +1232,7 @@ static void router_outbound_open_handler
// connection to arrive.
//
if (qd_router_connection_is_on_demand(conn)) {
+ qd_waypoint_connection_opened(router->qd, (qd_config_connector_t*)
context, conn);
return;
}
@@ -1246,6 +1274,7 @@ static void router_outbound_open_handler
rlink->link_type = QD_LINK_ROUTER;
rlink->link_direction = QD_INCOMING;
rlink->owning_addr = 0;
+ rlink->waypoint = 0;
rlink->link = receiver;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -1271,6 +1300,7 @@ static void router_outbound_open_handler
rlink->link_type = QD_LINK_ROUTER;
rlink->link_direction = QD_OUTGOING;
rlink->owning_addr = router->hello_addr;
+ rlink->waypoint = 0;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -1375,7 +1405,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd
router->timer = qd_timer(qd, qd_router_timer_handler, (void*)
router);
router->dtag = 1;
DEQ_INIT(router->config_addrs);
- DEQ_INIT(router->config_waypoints);
+ DEQ_INIT(router->waypoints);
//
// Configure the router from the configuration file
Modified: qpid/dispatch/trunk/src/router_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Fri Apr 4 20:21:18 2014
@@ -31,6 +31,10 @@ typedef struct qd_router_node_t qd_r
typedef struct qd_router_ref_t qd_router_ref_t;
typedef struct qd_router_link_ref_t qd_router_link_ref_t;
typedef struct qd_router_conn_t qd_router_conn_t;
+typedef struct qd_config_phase_t qd_config_phase_t;
+typedef struct qd_config_address_t qd_config_address_t;
+typedef struct qd_waypoint_t qd_waypoint_t;
+
void qd_router_python_setup(qd_router_t *router);
void qd_pyrouter_tick(qd_router_t *router);
@@ -46,6 +50,7 @@ typedef enum {
typedef enum {
QD_LINK_ENDPOINT, // A link to a connected endpoint
+ QD_LINK_WAYPOINT, // A link to a configured waypoint
QD_LINK_ROUTER, // A link to a peer router in the same area
QD_LINK_AREA // A link to a peer router in a different area (area
boundary)
} qd_link_type_t;
@@ -69,6 +74,7 @@ struct qd_router_link_t {
qd_link_type_t link_type;
qd_direction_t link_direction;
qd_address_t *owning_addr; // [ref] Address record that owns
this link
+ qd_waypoint_t *waypoint; // [ref] Waypoint that owns this
link
qd_link_t *link; // [own] Link pointer
qd_router_link_t *connected_link; // [ref] If this is a link-route,
reference the connected link
qd_router_link_t *peer_link; // [ref] If this is a
bidirectional link-route, reference the peer link
@@ -131,6 +137,7 @@ struct qd_address_t {
qd_address_t *static_cc;
qd_address_t *dynamic_cc;
bool toggle;
+ bool waypoint;
//
// TODO - Add support for asynchronous address lookup:
@@ -152,10 +159,6 @@ ALLOC_DECLARE(qd_address_t);
DEQ_DECLARE(qd_address_t, qd_address_list_t);
-typedef struct qd_config_phase_t qd_config_phase_t;
-typedef struct qd_config_address_t qd_config_address_t;
-typedef struct qd_config_waypoint_t qd_config_waypoint_t;
-
struct qd_config_phase_t {
DEQ_LINKS(qd_config_phase_t);
char phase;
@@ -173,17 +176,22 @@ struct qd_config_address_t {
DEQ_DECLARE(qd_config_address_t, qd_config_address_list_t);
-struct qd_config_waypoint_t {
- DEQ_LINKS(qd_config_waypoint_t);
- char *name;
- char in_phase;
- char out_phase;
- qd_connector_t *connector;
- qd_link_t *in_link;
- qd_link_t *out_link;
+struct qd_waypoint_t {
+ DEQ_LINKS(qd_waypoint_t);
+ const char *name;
+ char in_phase;
+ char out_phase;
+ const char *connector_name;
+ qd_config_connector_t *connector;
+ qd_connection_t *connection;
+ qd_link_t *in_link;
+ qd_link_t *out_link;
+ qd_address_t *in_address;
+ qd_address_t *out_address;
+ bool connected;
};
-DEQ_DECLARE(qd_config_waypoint_t, qd_config_waypoint_list_t);
+DEQ_DECLARE(qd_waypoint_t, qd_waypoint_list_t);
struct qd_router_t {
@@ -210,7 +218,7 @@ struct qd_router_t {
uint64_t dtag;
qd_config_address_list_t config_addrs;
- qd_config_waypoint_list_t config_waypoints;
+ qd_waypoint_list_t waypoints;
qd_agent_class_t *class_router;
qd_agent_class_t *class_link;
Modified: qpid/dispatch/trunk/src/router_pynode.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Fri Apr 4 20:21:18 2014
@@ -25,6 +25,7 @@
#include <qpid/dispatch.h>
#include "dispatch_private.h"
#include "router_private.h"
+#include "waypoint_private.h"
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE |
QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS |
QD_BYPASS_VALID_ORIGINS;
@@ -400,6 +401,12 @@ static PyObject* qd_map_destination(PyOb
qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
qd_router_add_node_ref_LH(&addr->rnodes, rnode);
+ //
+ // If the address has an associated waypoint, notify the waypoint module
of the changes.
+ //
+ if (addr->waypoint)
+ qd_waypoint_address_updated_LH(router->qd, addr);
+
sys_mutex_unlock(router->lock);
qd_log(log_source, QD_LOG_DEBUG, "Remote Destination '%s' Mapped to router
%d", addr_string, maskbit);
@@ -446,6 +453,13 @@ static PyObject* qd_unmap_destination(Py
}
qd_router_del_node_ref_LH(&addr->rnodes, rnode);
+
+ //
+ // If the address has an associated waypoint, notify the waypoint module
of the changes.
+ //
+ if (addr->waypoint)
+ qd_waypoint_address_updated_LH(router->qd, addr);
+
sys_mutex_unlock(router->lock);
qd_router_check_addr(router, addr, 0);
Added: qpid/dispatch/trunk/src/waypoint.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1584887&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (added)
+++ qpid/dispatch/trunk/src/waypoint.c Fri Apr 4 20:21:18 2014
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "waypoint_private.h"
+#include "dispatch_private.h"
+#include "router_private.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/connection_manager.h>
+#include <memory.h>
+#include <stdio.h>
+
+typedef struct qd_waypoint_ref_t qd_waypoint_ref_t;
+typedef struct qd_waypoint_context_t qd_waypoint_context_t;
+
+struct qd_waypoint_ref_t {
+ DEQ_LINKS(qd_waypoint_ref_t);
+ qd_waypoint_t *wp;
+};
+
+DEQ_DECLARE(qd_waypoint_ref_t, qd_waypoint_ref_list_t);
+
+struct qd_waypoint_context_t {
+ qd_waypoint_ref_list_t refs;
+};
+
+
+static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
+{
+ qd_router_t *router = qd->router;
+ qd_address_t *addr = wp->in_address;
+ char unused;
+
+ //
+ // If the waypoint has no in-address, look it up in the hash table or
create
+ // a new one and put it in the hash table.
+ //
+ if (!addr) {
+ //
+ // Compose the phased-address and search the routing table for the
address.
+ // If it's not found, add it to the table but leave the link/router
linkages empty.
+ //
+ qd_field_iterator_t *iter = qd_field_iterator_string(wp->name,
ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, wp->in_phase);
+ qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
+
+ if (!addr) {
+ addr = new_qd_address_t();
+ memset(addr, 0, sizeof(qd_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ addr->waypoint = true;
+ addr->semantics = router_semantics_for_addr(router, iter,
wp->in_phase, &unused);
+ }
+
+ wp->in_address = addr;
+ qd_field_iterator_free(iter);
+ }
+
+ if (!wp->connected)
+ qd_connection_manager_start_on_demand(qd, wp->connector);
+ else if (!wp->out_link) {
+ wp->out_link = qd_link(router->node, wp->connection, QD_OUTGOING,
wp->name);
+ pn_terminus_set_address(qd_link_target(wp->out_link), wp->name);
+
+ qd_router_link_t *rlink = new_qd_router_link_t();
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_type = QD_LINK_WAYPOINT;
+ rlink->link_direction = QD_OUTGOING;
+ rlink->owning_addr = addr;
+ rlink->waypoint = wp;
+ rlink->link = wp->out_link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ rlink->ref = 0;
+ rlink->target = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+ DEQ_INSERT_TAIL(router->links, rlink);
+ qd_link_set_context(wp->out_link, rlink);
+ qd_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ qd_field_iterator_t *iter = qd_field_iterator_string(wp->name,
ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, wp->in_phase);
+ qd_router_mobile_added(router, iter);
+ qd_field_iterator_free(iter);
+ }
+
+ pn_link_open(qd_link_pn(wp->out_link));
+ qd_link_activate(wp->out_link);
+
+ qd_log(router->log_source, QD_LOG_DEBUG, "Added outgoing link for
waypoint: %s", wp->name);
+ }
+}
+
+
+static void qd_waypoint_visit_source_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
+{
+ qd_router_t *router = qd->router;
+ qd_address_t *addr = wp->out_address;
+ char unused;
+
+ //
+ // If the waypoint has no out-address, look it up in the hash table or
create
+ // a new one and put it in the hash table.
+ //
+ if (!addr) {
+ //
+ // Compose the phased-address and search the routing table for the
address.
+ // If it's not found, add it to the table but leave the link/router
linkages empty.
+ //
+ qd_field_iterator_t *iter = qd_field_iterator_string(wp->name,
ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, wp->out_phase);
+ qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
+
+ if (!addr) {
+ addr = new_qd_address_t();
+ memset(addr, 0, sizeof(qd_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ addr->waypoint = true;
+ addr->semantics = router_semantics_for_addr(router, iter,
wp->out_phase, &unused);
+ }
+
+ wp->out_address = addr;
+ qd_field_iterator_free(iter);
+ }
+
+ if (!wp->connected)
+ qd_connection_manager_start_on_demand(qd, wp->connector);
+ else if (!wp->in_link) {
+ wp->in_link = qd_link(router->node, wp->connection, QD_INCOMING,
wp->name);
+ pn_terminus_set_address(qd_link_source(wp->in_link), wp->name);
+
+ qd_router_link_t *rlink = new_qd_router_link_t();
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_type = QD_LINK_WAYPOINT;
+ rlink->link_direction = QD_INCOMING;
+ rlink->owning_addr = addr;
+ rlink->waypoint = wp;
+ rlink->link = wp->in_link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ rlink->ref = 0;
+ rlink->target = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+ DEQ_INSERT_TAIL(router->links, rlink);
+ qd_link_set_context(wp->in_link, rlink);
+
+ pn_link_open(qd_link_pn(wp->in_link));
+ qd_link_activate(wp->in_link);
+
+ qd_log(router->log_source, QD_LOG_DEBUG, "Added incoming link for
waypoint: %s", wp->name);
+ }
+
+ if (DEQ_SIZE(addr->rlinks) + DEQ_SIZE(addr->rnodes) > 0) {
+ //
+ // CASE: This address has reachable destinations in the network.
+ // If there is no inbound link from the waypoint source,
+ // establish one and issue credit.
+ //
+ pn_link_flow(qd_link_pn(wp->in_link), 1);
+ qd_link_activate(wp->in_link);
+ qd_log(router->log_source, QD_LOG_DEBUG, "Added credit for incoming
link for waypoint: %s", wp->name);
+ } else {
+ //
+ // CASE: This address has no reachable destinations in the network.
+ //
+ }
+}
+
+
+static void qd_waypoint_visit_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
+{
+ if (wp->in_phase)
+ qd_waypoint_visit_sink_LH(qd, wp);
+ if (wp->out_phase)
+ qd_waypoint_visit_source_LH(qd, wp);
+}
+
+
+static void qd_waypoint_activate_in_phase_LH(qd_dispatch_t *qd, qd_waypoint_t
*wp)
+{
+ //
+ // Activate the "Sink" side of this waypoint.
+ //
+
+ //
+ // If the in-phase is null, then no in-phase was configured for this
waypoint.
+ //
+ if (wp->in_phase == '\0')
+ return;
+
+ //
+ // Since the waypoint has an in-phase, we wish to advertise the waypoint's
address
+ // for senders to send to. We can't do this until a connection and link
are set up
+ // to the waypoint. Start the on-demand connector.
+ //
+ qd_connection_manager_start_on_demand(qd, wp->connector);
+}
+
+
+static void qd_waypoint_activate_out_phase_LH(qd_dispatch_t *qd, qd_waypoint_t
*wp)
+{
+ //
+ // Activate the "Source" side of this waypoint.
+ //
+
+ // This function intentionally left blank
+}
+
+
+void qd_waypoint_activate_all(qd_dispatch_t *qd)
+{
+ qd_router_t *router = qd->router;
+ qd_waypoint_t *wp;
+
+ sys_mutex_lock(router->lock);
+ for (wp = DEQ_HEAD(router->waypoints); wp; wp = DEQ_NEXT(wp)) {
+ //
+ // Associate the waypoint with its named on-demand connector and for
every
+ // on-demand connector, create a list of associated waypoints.
+ //
+ if (!wp->connector) {
+ wp->connector = qd_connection_manager_find_on_demand(qd,
wp->connector_name);
+ if (!wp->connector) {
+ qd_log(qd->router->log_source, QD_LOG_ERROR,
+ "In waypoint '%s', on-demand connector '%s' not found",
+ wp->name, wp->connector_name);
+ continue;
+ }
+
+ qd_waypoint_context_t *context =
+ (qd_waypoint_context_t*)
qd_config_connector_context(wp->connector);
+ if (!context) {
+ context = NEW(qd_waypoint_context_t);
+ DEQ_INIT(context->refs);
+ qd_config_connector_set_context(wp->connector, context);
+ }
+ qd_waypoint_ref_t *ref = NEW(qd_waypoint_ref_t);
+ DEQ_ITEM_INIT(ref);
+ ref->wp = wp;
+ DEQ_INSERT_TAIL(context->refs, ref);
+ }
+ }
+
+ for (wp = DEQ_HEAD(router->waypoints); wp; wp = DEQ_NEXT(wp)) {
+ qd_waypoint_activate_in_phase_LH(qd, wp);
+ qd_waypoint_activate_out_phase_LH(qd, wp);
+ qd_waypoint_visit_LH(qd, wp);
+ }
+ sys_mutex_unlock(router->lock);
+}
+
+
+void qd_waypoint_connection_opened(qd_dispatch_t *qd, qd_config_connector_t
*cc, qd_connection_t *conn)
+{
+ qd_waypoint_context_t *context = (qd_waypoint_context_t*)
qd_config_connector_context(cc);
+
+ qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector opened:
%s",
+ qd_config_connector_name(cc));
+
+ sys_mutex_lock(qd->router->lock);
+ qd_waypoint_ref_t *ref = DEQ_HEAD(context->refs);
+ while (ref) {
+ ref->wp->connected = true;
+ ref->wp->connection = conn;
+ qd_waypoint_visit_LH(qd, ref->wp);
+ ref = DEQ_NEXT(ref);
+ }
+ sys_mutex_unlock(qd->router->lock);
+}
+
+
+void qd_waypoint_new_incoming_link(qd_dispatch_t *qd, qd_waypoint_t *wp,
qd_link_t *link)
+{
+}
+
+
+void qd_waypoint_new_outgoing_link(qd_dispatch_t *qd, qd_waypoint_t *wp,
qd_link_t *link)
+{
+}
+
+
+void qd_waypoint_link_closed(qd_dispatch_t *qd, qd_waypoint_t *wp, qd_link_t
*link)
+{
+}
+
+
+void qd_waypoint_address_updated_LH(qd_dispatch_t *qd, qd_address_t *addr)
+{
+ qd_waypoint_t *wp = DEQ_HEAD(qd->router->waypoints);
+ while (wp) {
+ if (wp->out_address == addr)
+ qd_waypoint_visit_LH(qd, wp);
+ wp = DEQ_NEXT(wp);
+ }
+}
+
Propchange: qpid/dispatch/trunk/src/waypoint.c
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/dispatch/trunk/src/waypoint_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint_private.h?rev=1584887&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/waypoint_private.h (added)
+++ qpid/dispatch/trunk/src/waypoint_private.h Fri Apr 4 20:21:18 2014
@@ -0,0 +1,43 @@
+#ifndef __waypoint_private_h__
+#define __waypoint_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/connection_manager.h>
+
+typedef struct qd_dispatch_t qd_dispatch_t;
+typedef struct qd_waypoint_t qd_waypoint_t;
+typedef struct qd_address_t qd_address_t;
+typedef struct qd_link_t qd_link_t;
+
+void qd_waypoint_activate_all(qd_dispatch_t *qd);
+
+void qd_waypoint_connection_opened(qd_dispatch_t *qd, qd_config_connector_t
*cc, qd_connection_t *conn);
+
+void qd_waypoint_new_incoming_link(qd_dispatch_t *qd, qd_waypoint_t *wp,
qd_link_t *link);
+
+void qd_waypoint_new_outgoing_link(qd_dispatch_t *qd, qd_waypoint_t *wp,
qd_link_t *link);
+
+void qd_waypoint_link_closed(qd_dispatch_t *qd, qd_waypoint_t *wp, qd_link_t
*link);
+
+void qd_waypoint_address_updated_LH(qd_dispatch_t *qd, qd_address_t *address);
+
+
+#endif
Propchange: qpid/dispatch/trunk/src/waypoint_private.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/dispatch/trunk/src/waypoint_private.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Modified: qpid/dispatch/trunk/tools/qdstat.in
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat.in?rev=1584887&r1=1584886&r2=1584887&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat.in Fri Apr 4 20:21:18 2014
@@ -211,6 +211,7 @@ class BusManager:
heads.append(Header("rindex"))
heads.append(Header("class"))
heads.append(Header("addr"))
+ heads.append(Header("phase"))
rows = []
objects = self._get_object('org.apache.qpid.dispatch.router.link')
@@ -225,6 +226,7 @@ class BusManager:
row.append('-')
row.append(self._addr_class(link['owning-addr']))
row.append(self._addr_text(link['owning-addr']))
+ row.append(self._addr_phase(link['owning-addr']))
rows.append(row)
title = "Router Links"
dispRows = rows
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]