Author: tross
Date: Fri Apr 4 20:09:44 2014
New Revision: 1584881
URL: http://svn.apache.org/r1584881
Log:
DISPATCH-35 - Moved connection handling and configuration to a new
configuration_manager module.
Added on-demand connectors.
Added:
qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (with
props)
qpid/dispatch/trunk/src/connection_manager.c (with props)
Modified:
qpid/dispatch/trunk/CMakeLists.txt
qpid/dispatch/trunk/include/qpid/dispatch.h
qpid/dispatch/trunk/src/dispatch.c
qpid/dispatch/trunk/src/dispatch_private.h
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
qpid/dispatch/trunk/src/timer_private.h
Modified: qpid/dispatch/trunk/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Fri Apr 4 20:09:44 2014
@@ -114,6 +114,7 @@ set(server_SOURCES
src/buffer.c
src/compose.c
src/config.c
+ src/connection_manager.c
src/container.c
src/dispatch.c
src/hash.c
Modified: qpid/dispatch/trunk/include/qpid/dispatch.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch.h Fri Apr 4 20:09:44 2014
@@ -39,6 +39,7 @@
#include <qpid/dispatch/message.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/connection_manager.h>
#include <qpid/dispatch/dispatch.h>
#endif
Added: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h?rev=1584881&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h Fri Apr 4
20:09:44 2014
@@ -0,0 +1,87 @@
+#ifndef __dispatch_connection_manager_h__
+#define __dispatch_connection_manager_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/dispatch.h>
+
+typedef struct qd_connection_manager_t qd_connection_manager_t;
+typedef struct qd_config_connector_t qd_config_connector_t;
+typedef struct qd_config_listener_t qd_config_listener_t;
+
+/**
+ * \brief Allocate a connection manager
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Free all the resources associated with the connection manager
+ *
+ * @param cm The connection manager handle returned by qd_connection_manager.
+ */
+void qd_connection_manager_free(qd_connection_manager_t *cm);
+
+
+/**
+ * \brief Load the Listeners and Connections from the configuration file.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+void qd_connection_manager_configure(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Start the configured Listeners and Connectors
+ *
+ * Note that on-demand connectors are not started by this function.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+void qd_connection_manager_start(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Given a connector-tag, find and return a pointer to the on-demand
connector.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ * @param tag The tag that uniquely identifies the on-demand connector.
+ * @return The matching on-demand connector or NULL if the tag is not found.
+ */
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *tag);
+
+
+/**
+ * \brief Start an on-demand connector.
+ *
+ * @param od The pointer to an on-demand connector returned by
qd_connections_find_on_demand.
+ */
+void qd_connection_manager_start_on_demand(qd_config_connector_t *od);
+
+
+/**
+ * \brief Stop an on-demand connector.
+ *
+ * @param od The pointer to an on-demand connector returned by
qd_connections_find_on_demand.
+ */
+void qd_connection_manager_stop_on_demand(qd_config_connector_t *od);
+
+#endif
Propchange: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/dispatch/trunk/src/connection_manager.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1584881&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (added)
+++ qpid/dispatch/trunk/src/connection_manager.c Fri Apr 4 20:09:44 2014
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/connection_manager.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/agent.h>
+#include "server_private.h"
+#include "dispatch_private.h"
+#include <string.h>
+
+static const char *CONF_LISTENER = "listener";
+static const char *CONF_CONNECTOR = "connector";
+
+
+struct qd_config_listener_t {
+ DEQ_LINKS(qd_config_listener_t);
+ qd_listener_t *listener;
+ qd_server_config_t configuration;
+};
+
+DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t);
+
+
+struct qd_config_connector_t {
+ DEQ_LINKS(qd_config_connector_t);
+ qd_connector_t *connector;
+ qd_server_config_t configuration;
+ const char *tag;
+ bool started;
+};
+
+DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
+
+
+struct qd_connection_manager_t {
+ qd_log_source_t *log_source;
+ qd_server_t *server;
+ qd_config_listener_list_t config_listeners;
+ qd_config_connector_list_t config_connectors;
+ qd_config_connector_list_t on_demand_connectors;
+};
+
+
+static void load_server_config(qd_dispatch_t *qd, qd_server_config_t *config,
const char *section, int i)
+{
+ config->host = qd_config_item_value_string(qd, section, i,
"addr");
+ config->port = qd_config_item_value_string(qd, section, i,
"port");
+ config->role = qd_config_item_value_string(qd, section, i,
"role");
+ config->max_frame_size = qd_config_item_value_int(qd, section, i,
"max-frame-size");
+ config->sasl_mechanisms =
+ qd_config_item_value_string(qd, section, i, "sasl-mechanisms");
+ config->ssl_enabled =
+ qd_config_item_value_bool(qd, section, i, "ssl-profile");
+ if (config->ssl_enabled) {
+ config->ssl_server = 1;
+ config->ssl_allow_unsecured_client =
+ qd_config_item_value_bool(qd, section, i, "allow-unsecured");
+ config->ssl_certificate_file =
+ qd_config_item_value_string(qd, section, i, "cert-file");
+ config->ssl_private_key_file =
+ qd_config_item_value_string(qd, section, i, "key-file");
+ config->ssl_password =
+ qd_config_item_value_string(qd, section, i, "password");
+ config->ssl_trusted_certificate_db =
+ qd_config_item_value_string(qd, section, i, "cert-db");
+ config->ssl_trusted_certificates =
+ qd_config_item_value_string(qd, section, i, "trusted-certs");
+ config->ssl_require_peer_authentication =
+ qd_config_item_value_bool(qd, section, i, "require-peer-auth");
+ }
+}
+
+
+static void configure_listeners(qd_dispatch_t *qd)
+{
+ int count;
+ qd_connection_manager_t *cm = qd->connection_manager;
+
+ if (!qd->config || !cm) {
+ assert(false);
+ return;
+ }
+
+ count = qd_config_item_count(qd, CONF_LISTENER);
+ for (int i = 0; i < count; i++) {
+ qd_config_listener_t *cl = NEW(qd_config_listener_t);
+ cl->listener = 0;
+ load_server_config(qd, &cl->configuration, CONF_LISTENER, i);
+ DEQ_ITEM_INIT(cl);
+ DEQ_INSERT_TAIL(cm->config_listeners, cl);
+ qd_log(cm->log_source, QD_LOG_INFO, "Configured Listener: %s:%s
role=%s",
+ cl->configuration.host, cl->configuration.port,
cl->configuration.role);
+ }
+}
+
+
+static void configure_connectors(qd_dispatch_t *qd)
+{
+ int count;
+ qd_connection_manager_t *cm = qd->connection_manager;
+
+ if (!qd->config || !cm) {
+ assert(false);
+ return;
+ }
+
+ count = qd_config_item_count(qd, CONF_CONNECTOR);
+ for (int i = 0; i < count; i++) {
+ qd_config_connector_t *cc = NEW(qd_config_connector_t);
+ cc->connector = 0;
+ cc->tag = 0;
+ cc->started = false;
+ load_server_config(qd, &cc->configuration, CONF_CONNECTOR, i);
+ DEQ_ITEM_INIT(cc);
+ if (strcmp(cc->configuration.role, "on-demand") == 0) {
+ DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
+ } else
+ DEQ_INSERT_TAIL(cm->config_connectors, cc);
+ qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s
role=%s",
+ cc->configuration.host, cc->configuration.port,
cc->configuration.role);
+ }
+}
+
+
+qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)
+{
+ qd_connection_manager_t *cm = NEW(qd_connection_manager_t);
+ if (!cm)
+ return 0;
+
+ cm->log_source = qd_log_source("CONN_MGR");
+ cm->server = qd->server;
+ DEQ_INIT(cm->config_listeners);
+ DEQ_INIT(cm->config_connectors);
+ DEQ_INIT(cm->on_demand_connectors);
+
+ return cm;
+}
+
+
+void qd_connection_manager_free(qd_connection_manager_t *cm)
+{
+}
+
+
+void qd_connection_manager_configure(qd_dispatch_t *qd)
+{
+ configure_listeners(qd);
+ configure_connectors(qd);
+}
+
+
+void qd_connection_manager_start(qd_dispatch_t *qd)
+{
+ qd_config_listener_t *cl =
DEQ_HEAD(qd->connection_manager->config_listeners);
+ qd_config_connector_t *cc =
DEQ_HEAD(qd->connection_manager->config_connectors);
+
+ while (cl) {
+ if (cl->listener == 0)
+ cl->listener = qd_server_listen(qd, &cl->configuration, cl);
+ cl = DEQ_NEXT(cl);
+ }
+
+ while (cc) {
+ if (cc->connector == 0)
+ cc->connector = qd_server_connect(qd, &cc->configuration, cc);
+ cc = DEQ_NEXT(cc);
+ }
+}
+
+
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd,
const char *tag)
+{
+ return 0;
+}
+
+
+void qd_connection_manager_start_on_demand(qd_config_connector_t *od)
+{
+}
+
+
+void qd_connection_manager_stop_on_demand(qd_config_connector_t *od)
+{
+}
+
+static void server_schema_handler(void *context, void *cor)
+{
+ qd_agent_value_string(cor, 0, "state");
+ qd_agent_value_string(cor, 0, "container");
+ qd_agent_value_string(cor, 0, "host");
+ qd_agent_value_string(cor, 0, "sasl");
+ qd_agent_value_string(cor, 0, "role");
+ qd_agent_value_string(cor, 0, "dir");
+}
+
+
+static void server_query_handler(void* context, const char *id, void *cor)
+{
+ qd_dispatch_t *qd = (qd_dispatch_t*) context;
+ qd_server_t *qd_server = qd->server;
+ sys_mutex_lock(qd_server->lock);
+ const char *conn_state;
+ const qd_server_config_t *config;
+ const char *pn_container_name;
+ const char *direction;
+
+ qd_connection_t *conn = DEQ_HEAD(qd_server->connections);
+ while (conn) {
+ switch (conn->state) {
+ case CONN_STATE_CONNECTING: conn_state = "Connecting"; break;
+ case CONN_STATE_OPENING: conn_state = "Opening"; break;
+ case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break;
+ case CONN_STATE_FAILED: conn_state = "Failed"; break;
+ case CONN_STATE_USER: conn_state = "User"; break;
+ default: conn_state = "undefined"; break;
+ }
+ qd_agent_value_string(cor, "state", conn_state);
+ // get remote container name using proton connection
+ pn_container_name = pn_connection_remote_container(conn->pn_conn);
+ if (pn_container_name)
+ qd_agent_value_string(cor, "container", pn_container_name);
+ else
+ qd_agent_value_null(cor, "container");
+
+ // and now for some config entries
+ if (conn->connector) {
+ config = conn->connector->config;
+ direction = "out";
+ char host[1000];
+ strcpy(host, config->host);
+ strcat(host, ":");
+ strcat(host, config->port);
+ qd_agent_value_string(cor, "host", host);
+ } else {
+ config = conn->listener->config;
+ direction = "in";
+ qd_agent_value_string(cor, "host",
pn_connector_name(conn->pn_cxtr));
+ }
+
+ qd_agent_value_string(cor, "sasl", config->sasl_mechanisms);
+ qd_agent_value_string(cor, "role", config->role);
+ qd_agent_value_string(cor, "dir", direction);
+
+ conn = DEQ_NEXT(conn);
+ qd_agent_value_complete(cor, conn != 0);
+ }
+ sys_mutex_unlock(qd_server->lock);
+}
+
+
+void qd_connection_manager_setup_agent(qd_dispatch_t *qd)
+{
+ qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection", qd,
server_schema_handler, server_query_handler);
+}
Propchange: qpid/dispatch/trunk/src/connection_manager.c
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/src/dispatch.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Fri Apr 4 20:09:44 2014
@@ -30,7 +30,7 @@
* Private Function Prototypes
*/
qd_server_t *qd_server(int tc, const char *container_name);
-void qd_server_setup_agent(qd_dispatch_t *qd);
+void qd_connection_manager_setup_agent(qd_dispatch_t *qd);
void qd_server_free(qd_server_t *server);
qd_container_t *qd_container(qd_dispatch_t *qd);
void qd_container_setup_agent(qd_dispatch_t *qd);
@@ -41,13 +41,9 @@ void qd_router_free(qd_router
qd_agent_t *qd_agent(qd_dispatch_t *qd);
void qd_agent_free(qd_agent_t *agent);
-ALLOC_DEFINE(qd_config_listener_t);
-ALLOC_DEFINE(qd_config_connector_t);
static const char *CONF_CONTAINER = "container";
static const char *CONF_ROUTER = "router";
-static const char *CONF_LISTENER = "listener";
-static const char *CONF_CONNECTOR = "connector";
qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
@@ -56,9 +52,6 @@ qd_dispatch_t *qd_dispatch(const char *p
memset(qd, 0, sizeof(qd_dispatch_t));
- DEQ_INIT(qd->config_listeners);
- DEQ_INIT(qd->config_connectors);
-
// alloc and log has to be initialized before any module.
qd_alloc_initialize();
qd_log_initialize();
@@ -133,13 +126,14 @@ void qd_dispatch_configure_router(qd_dis
void qd_dispatch_prepare(qd_dispatch_t *qd)
{
- qd->server = qd_server(qd->thread_count, qd->container_name);
- qd->container = qd_container(qd);
- qd->router = qd_router(qd, qd->router_mode, qd->router_area,
qd->router_id);
- qd->agent = qd_agent(qd);
+ qd->server = qd_server(qd->thread_count, qd->container_name);
+ qd->container = qd_container(qd);
+ qd->router = qd_router(qd, qd->router_mode, qd->router_area,
qd->router_id);
+ qd->agent = qd_agent(qd);
+ qd->connection_manager = qd_connection_manager(qd);
qd_alloc_setup_agent(qd);
- qd_server_setup_agent(qd);
+ qd_connection_manager_setup_agent(qd);
qd_container_setup_agent(qd);
qd_router_setup_late(qd);
}
@@ -149,6 +143,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
{
qd_config_free(qd->config);
qd_config_finalize();
+ qd_connection_manager_free(qd->connection_manager);
qd_agent_free(qd->agent);
qd_router_free(qd->router);
qd_container_free(qd->container);
@@ -158,98 +153,9 @@ void qd_dispatch_free(qd_dispatch_t *qd)
}
-static void load_server_config(qd_dispatch_t *qd, qd_server_config_t *config,
const char *section, int i)
-{
- config->host = qd_config_item_value_string(qd, section, i, "addr");
- config->port = qd_config_item_value_string(qd, section, i, "port");
- config->role = qd_config_item_value_string(qd, section, i, "role");
- config->max_frame_size = qd_config_item_value_int(qd, section, i,
"max-frame-size");
- config->sasl_mechanisms =
- qd_config_item_value_string(qd, section, i, "sasl-mechanisms");
- config->ssl_enabled =
- qd_config_item_value_bool(qd, section, i, "ssl-profile");
- if (config->ssl_enabled) {
- config->ssl_server = 1;
- config->ssl_allow_unsecured_client =
- qd_config_item_value_bool(qd, section, i, "allow-unsecured");
- config->ssl_certificate_file =
- qd_config_item_value_string(qd, section, i, "cert-file");
- config->ssl_private_key_file =
- qd_config_item_value_string(qd, section, i, "key-file");
- config->ssl_password =
- qd_config_item_value_string(qd, section, i, "password");
- config->ssl_trusted_certificate_db =
- qd_config_item_value_string(qd, section, i, "cert-db");
- config->ssl_trusted_certificates =
- qd_config_item_value_string(qd, section, i, "trusted-certs");
- config->ssl_require_peer_authentication =
- qd_config_item_value_bool(qd, section, i, "require-peer-auth");
- }
-}
-
-
-static void configure_listeners(qd_dispatch_t *qd)
-{
- int count;
-
- if (!qd->config)
- return;
-
- count = qd_config_item_count(qd, CONF_LISTENER);
- for (int i = 0; i < count; i++) {
- qd_config_listener_t *cl = new_qd_config_listener_t();
- load_server_config(qd, &cl->configuration, CONF_LISTENER, i);
-
- printf("\nListener : %s:%s\n", cl->configuration.host,
cl->configuration.port);
- printf(" SASL: %s\n", cl->configuration.sasl_mechanisms);
- printf(" SSL: %d\n", cl->configuration.ssl_enabled);
- if (cl->configuration.ssl_enabled) {
- printf(" unsec: %d\n",
cl->configuration.ssl_allow_unsecured_client);
- printf(" cert-file: %s\n",
cl->configuration.ssl_certificate_file);
- printf(" key-file: %s\n",
cl->configuration.ssl_private_key_file);
- printf(" cert-db: %s\n",
cl->configuration.ssl_trusted_certificate_db);
- printf(" peer-auth: %d\n",
cl->configuration.ssl_require_peer_authentication);
- }
-
- cl->listener = qd_server_listen(qd, &cl->configuration, cl);
- DEQ_ITEM_INIT(cl);
- DEQ_INSERT_TAIL(qd->config_listeners, cl);
- }
-}
-
-
-static void configure_connectors(qd_dispatch_t *qd)
-{
- int count;
-
- if (!qd->config)
- return;
-
- count = qd_config_item_count(qd, CONF_CONNECTOR);
- for (int i = 0; i < count; i++) {
- qd_config_connector_t *cc = new_qd_config_connector_t();
- load_server_config(qd, &cc->configuration, CONF_CONNECTOR, i);
-
- printf("\nConnector : %s:%s\n", cc->configuration.host,
cc->configuration.port);
- printf(" SASL: %s\n", cc->configuration.sasl_mechanisms);
- printf(" SSL: %d\n", cc->configuration.ssl_enabled);
- if (cc->configuration.ssl_enabled) {
- printf(" cert-file: %s\n",
cc->configuration.ssl_certificate_file);
- printf(" key-file: %s\n",
cc->configuration.ssl_private_key_file);
- printf(" cert-db: %s\n",
cc->configuration.ssl_trusted_certificate_db);
- printf(" peer-auth: %d\n",
cc->configuration.ssl_require_peer_authentication);
- }
-
- cc->connector = qd_server_connect(qd, &cc->configuration, cc);
- DEQ_ITEM_INIT(cc);
- DEQ_INSERT_TAIL(qd->config_connectors, cc);
- }
-}
-
-
void qd_dispatch_post_configure_connections(qd_dispatch_t *qd)
{
- configure_listeners(qd);
- configure_connectors(qd);
+ qd_connection_manager_configure(qd);
+ qd_connection_manager_start(qd);
}
Modified: qpid/dispatch/trunk/src/dispatch_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch_private.h (original)
+++ qpid/dispatch/trunk/src/dispatch_private.h Fri Apr 4 20:09:44 2014
@@ -24,39 +24,19 @@ typedef struct qd_router_t qd_router_
typedef struct qd_agent_t qd_agent_t;
#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/connection_manager.h>
#include "server_private.h"
#include "config_private.h"
#include "router_private.h"
-#include <qpid/dispatch/ctools.h>
-typedef struct qd_config_listener_t {
- DEQ_LINKS(struct qd_config_listener_t);
- qd_listener_t *listener;
- qd_server_config_t configuration;
-} qd_config_listener_t;
-
-DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t);
-ALLOC_DECLARE(qd_config_listener_t);
-
-
-typedef struct qd_config_connector_t {
- DEQ_LINKS(struct qd_config_connector_t);
- qd_connector_t *connector;
- qd_server_config_t configuration;
-} qd_config_connector_t;
-
-DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
-ALLOC_DECLARE(qd_config_connector_t);
struct qd_dispatch_t {
- qd_server_t *server;
- qd_container_t *container;
- qd_router_t *router;
- qd_agent_t *agent;
- qd_config_t *config;
-
- qd_config_listener_list_t config_listeners;
- qd_config_connector_list_t config_connectors;
+ qd_server_t *server;
+ qd_container_t *container;
+ qd_router_t *router;
+ qd_agent_t *agent;
+ qd_config_t *config;
+ qd_connection_manager_t *connection_manager;
int thread_count;
const char *container_name;
Modified: qpid/dispatch/trunk/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Apr 4 20:09:44 2014
@@ -26,9 +26,10 @@
#include "dispatch_private.h"
#include "router_private.h"
-static char *router_role = "inter-router";
-static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
+static char *router_role = "inter-router";
+static char *on_demand_role = "on-demand";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
static char *direct_prefix;
static char *node_id;
@@ -186,6 +187,22 @@ static int qd_router_connection_is_inter
/**
+ * Determine whether a connection is configured in the on-demand role.
+ */
+static int qd_router_connection_is_on_demand(const qd_connection_t *conn)
+{
+ if (!conn)
+ return 0;
+
+ const qd_server_config_t *cf = qd_connection_config(conn);
+ if (cf && strcmp(cf->role, on_demand_role) == 0)
+ return 1;
+
+ return 0;
+}
+
+
+/**
* Determine whether a terminus has router capability
*/
static int qd_router_terminus_is_router(pn_terminus_t *term)
@@ -1184,14 +1201,19 @@ static void router_outbound_open_handler
qd_router_t *router = (qd_router_t*) type_context;
//
- // Check the configured role of this connection. If it is not the
inter-router
- // role, ignore it.
+ // If the connection is on-demand, visit all waypoints that are waiting
for their
+ // connection to arrive.
//
- if (!qd_router_connection_is_inter_router(conn)) {
- qd_log(router->log_source, QD_LOG_WARNING, "Outbound connection set up
without inter-router role");
+ if (qd_router_connection_is_on_demand(conn)) {
return;
}
+ //
+ // If the connection isn't inter-router, ignore it.
+ //
+ if (!qd_router_connection_is_inter_router(conn))
+ return;
+
qd_link_t *sender;
qd_link_t *receiver;
qd_router_link_t *rlink;
Modified: qpid/dispatch/trunk/src/server.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Fri Apr 4 20:09:44 2014
@@ -19,7 +19,6 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/agent.h>
#include <qpid/dispatch/log.h>
#include "server_private.h"
#include "timer_private.h"
@@ -32,46 +31,6 @@
static __thread qd_server_t *thread_server = 0;
-typedef struct qd_thread_t {
- qd_server_t *qd_server;
- int thread_id;
- volatile int running;
- volatile int canceled;
- int using_thread;
- sys_thread_t *thread;
-} qd_thread_t;
-
-
-struct qd_server_t {
- int thread_count;
- const char *container_name;
- pn_driver_t *driver;
- qd_log_source_t *log_source;
- qd_thread_start_cb_t start_handler;
- qd_conn_handler_cb_t conn_handler;
- qd_user_fd_handler_cb_t ufd_handler;
- void *start_context;
- void *conn_handler_context;
- sys_cond_t *cond;
- sys_mutex_t *lock;
- qd_thread_t **threads;
- work_queue_t *work_queue;
- qd_timer_list_t pending_timers;
- bool a_thread_is_waiting;
- int threads_active;
- int pause_requests;
- int threads_paused;
- int pause_next_sequence;
- int pause_now_serving;
- qd_signal_handler_cb_t signal_handler;
- void *signal_context;
- int pending_signal;
- qd_connection_list_t connections;
-};
-
-
-
-
ALLOC_DEFINE(qd_listener_t);
ALLOC_DEFINE(qd_connector_t);
ALLOC_DEFINE(qd_connection_t);
@@ -123,7 +82,6 @@ static void thread_process_listeners(qd_
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- qd_log(qd_server->log_source, QD_LOG_DEBUG, "added listener
connection");
// qd_server->lock is already locked
DEQ_INSERT_TAIL(qd_server->connections, ctx);
@@ -521,8 +479,6 @@ static void *thread_run(void *arg)
sys_mutex_lock(qd_server->lock);
DEQ_REMOVE(qd_server->connections, ctx);
- qd_log(qd_server->log_source, QD_LOG_DEBUG, "removed %s
connection",
- ctx->connector ? "connector" : "listener");
free_qd_connection_t(ctx);
pn_connector_free(work);
if (conn)
@@ -617,7 +573,6 @@ static void cxtr_try_open(void *context)
sys_mutex_lock(ct->server->lock);
ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host,
ct->config->port, (void*) ctx);
DEQ_INSERT_TAIL(ct->server->connections, ctx);
- qd_log(ct->server->log_source, QD_LOG_DEBUG, "added connector connection");
sys_mutex_unlock(ct->server->lock);
ct->ctx = ctx;
@@ -1077,71 +1032,3 @@ void qd_server_timer_cancel_LH(qd_timer_
}
-static void server_schema_handler(void *context, void *cor)
-{
- qd_agent_value_string(cor, 0, "state");
- qd_agent_value_string(cor, 0, "container");
- qd_agent_value_string(cor, 0, "host");
- qd_agent_value_string(cor, 0, "sasl");
- qd_agent_value_string(cor, 0, "role");
- qd_agent_value_string(cor, 0, "dir");
-}
-
-
-static void server_query_handler(void* context, const char *id, void *cor)
-{
- qd_server_t *qd_server = (qd_server_t*) context;
- sys_mutex_lock(qd_server->lock);
- const char *conn_state;
- const qd_server_config_t *config;
- const char *pn_container_name;
- const char *direction;
-
- qd_connection_t *conn = DEQ_HEAD(qd_server->connections);
- while (conn) {
- switch (conn->state) {
- case CONN_STATE_CONNECTING: conn_state = "Connecting"; break;
- case CONN_STATE_OPENING: conn_state = "Opening"; break;
- case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break;
- case CONN_STATE_FAILED: conn_state = "Failed"; break;
- case CONN_STATE_USER: conn_state = "User"; break;
- default: conn_state = "undefined"; break;
- }
- qd_agent_value_string(cor, "state", conn_state);
- // get remote container name using proton connection
- pn_container_name = pn_connection_remote_container(conn->pn_conn);
- if (pn_container_name)
- qd_agent_value_string(cor, "container", pn_container_name);
- else
- qd_agent_value_null(cor, "container");
-
- // and now for some config entries
- if (conn->connector) {
- config = conn->connector->config;
- direction = "out";
- char host[1000];
- strcpy(host, config->host);
- strcat(host, ":");
- strcat(host, config->port);
- qd_agent_value_string(cor, "host", host);
- } else {
- config = conn->listener->config;
- direction = "in";
- qd_agent_value_string(cor, "host",
pn_connector_name(conn->pn_cxtr));
- }
-
- qd_agent_value_string(cor, "sasl", config->sasl_mechanisms);
- qd_agent_value_string(cor, "role", config->role);
- qd_agent_value_string(cor, "dir", direction);
-
- conn = DEQ_NEXT(conn);
- qd_agent_value_complete(cor, conn != 0);
- }
- sys_mutex_unlock(qd_server->lock);
-}
-
-
-void qd_server_setup_agent(qd_dispatch_t *qd)
-{
- qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection",
qd->server, server_schema_handler, server_query_handler);
-}
Modified: qpid/dispatch/trunk/src/server_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Fri Apr 4 20:09:44 2014
@@ -21,13 +21,18 @@
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/user_fd.h>
-#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include "work_queue.h"
#include <proton/driver.h>
#include <proton/engine.h>
#include <proton/driver_extras.h>
+typedef struct qd_server_t qd_server_t;
+
+#include "timer_private.h"
+
void qd_server_timer_pending_LH(qd_timer_t *timer);
void qd_server_timer_cancel_LH(qd_timer_t *timer);
@@ -85,6 +90,8 @@ struct qd_connection_t {
qd_user_fd_t *ufd;
};
+DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
+
struct qd_user_fd_t {
qd_server_t *server;
@@ -94,11 +101,47 @@ struct qd_user_fd_t {
};
+typedef struct qd_thread_t {
+ qd_server_t *qd_server;
+ int thread_id;
+ volatile int running;
+ volatile int canceled;
+ int using_thread;
+ sys_thread_t *thread;
+} qd_thread_t;
+
+
+struct qd_server_t {
+ int thread_count;
+ const char *container_name;
+ pn_driver_t *driver;
+ qd_log_source_t *log_source;
+ qd_thread_start_cb_t start_handler;
+ qd_conn_handler_cb_t conn_handler;
+ qd_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_handler_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ qd_thread_t **threads;
+ work_queue_t *work_queue;
+ qd_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ qd_signal_handler_cb_t signal_handler;
+ void *signal_context;
+ int pending_signal;
+ qd_connection_list_t connections;
+};
+
+
ALLOC_DECLARE(qd_listener_t);
ALLOC_DECLARE(qd_connector_t);
ALLOC_DECLARE(qd_connection_t);
ALLOC_DECLARE(qd_user_fd_t);
-DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
-
#endif
Modified: qpid/dispatch/trunk/src/timer_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/timer_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/timer_private.h (original)
+++ qpid/dispatch/trunk/src/timer_private.h Fri Apr 4 20:09:44 2014
@@ -22,7 +22,8 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/threading.h>
-#include "server_private.h"
+
+typedef struct qd_server_t qd_server_t;
typedef enum {
TIMER_FREE,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]