Author: tross
Date: Fri Apr  4 20:09:44 2014
New Revision: 1584881

URL: http://svn.apache.org/r1584881
Log:
DISPATCH-35 - Moved connection handling and configuration to a new 
configuration_manager module.
              Added on-demand connectors.

Added:
    qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h   (with 
props)
    qpid/dispatch/trunk/src/connection_manager.c   (with props)
Modified:
    qpid/dispatch/trunk/CMakeLists.txt
    qpid/dispatch/trunk/include/qpid/dispatch.h
    qpid/dispatch/trunk/src/dispatch.c
    qpid/dispatch/trunk/src/dispatch_private.h
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h
    qpid/dispatch/trunk/src/timer_private.h

Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Fri Apr  4 20:09:44 2014
@@ -114,6 +114,7 @@ set(server_SOURCES
     src/buffer.c
     src/compose.c
     src/config.c
+    src/connection_manager.c
     src/container.c
     src/dispatch.c
     src/hash.c

Modified: qpid/dispatch/trunk/include/qpid/dispatch.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch.h Fri Apr  4 20:09:44 2014
@@ -39,6 +39,7 @@
 #include <qpid/dispatch/message.h>
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/agent.h>
+#include <qpid/dispatch/connection_manager.h>
 #include <qpid/dispatch/dispatch.h>
 
 #endif

Added: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h?rev=1584881&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h Fri Apr  4 
20:09:44 2014
@@ -0,0 +1,87 @@
+#ifndef __dispatch_connection_manager_h__
+#define __dispatch_connection_manager_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/dispatch.h>
+
+typedef struct qd_connection_manager_t qd_connection_manager_t;
+typedef struct qd_config_connector_t qd_config_connector_t;
+typedef struct qd_config_listener_t qd_config_listener_t;
+
+/**
+ * \brief Allocate a connection manager
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Free all the resources associated with the connection manager
+ *
+ * @param cm The connection manager handle returned by qd_connection_manager.
+ */
+void qd_connection_manager_free(qd_connection_manager_t *cm);
+
+
+/**
+ * \brief Load the Listeners and Connections from the configuration file.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+void qd_connection_manager_configure(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Start the configured Listeners and Connectors
+ *
+ * Note that on-demand connectors are not started by this function.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ */
+void qd_connection_manager_start(qd_dispatch_t *qd);
+
+
+/**
+ * \brief Given a connector-tag, find and return a pointer to the on-demand 
connector.
+ *
+ * @param qd The dispatch handle returned by qd_dispatch.
+ * @param tag The tag that uniquely identifies the on-demand connector.
+ * @return The matching on-demand connector or NULL if the tag is not found.
+ */
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd, 
const char *tag);
+
+
+/**
+ * \brief Start an on-demand connector.
+ *
+ * @param od The pointer to an on-demand connector returned by 
qd_connections_find_on_demand.
+ */
+void qd_connection_manager_start_on_demand(qd_config_connector_t *od);
+
+
+/**
+ * \brief Stop an on-demand connector.
+ *
+ * @param od The pointer to an on-demand connector returned by 
qd_connections_find_on_demand.
+ */
+void qd_connection_manager_stop_on_demand(qd_config_connector_t *od);
+
+#endif

Propchange: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/dispatch/trunk/src/connection_manager.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1584881&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (added)
+++ qpid/dispatch/trunk/src/connection_manager.c Fri Apr  4 20:09:44 2014
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/connection_manager.h>
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/agent.h>
+#include "server_private.h"
+#include "dispatch_private.h"
+#include <string.h>
+
+static const char *CONF_LISTENER    = "listener";
+static const char *CONF_CONNECTOR   = "connector";
+
+
+struct qd_config_listener_t {
+    DEQ_LINKS(qd_config_listener_t);
+    qd_listener_t      *listener;
+    qd_server_config_t  configuration;
+};
+
+DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t);
+
+
+struct qd_config_connector_t {
+    DEQ_LINKS(qd_config_connector_t);
+    qd_connector_t     *connector;
+    qd_server_config_t  configuration;
+    const char         *tag;
+    bool                started;
+};
+
+DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
+
+
+struct qd_connection_manager_t {
+    qd_log_source_t             *log_source;
+    qd_server_t                 *server;
+    qd_config_listener_list_t    config_listeners;
+    qd_config_connector_list_t   config_connectors;
+    qd_config_connector_list_t   on_demand_connectors;
+};
+
+
+static void load_server_config(qd_dispatch_t *qd, qd_server_config_t *config, 
const char *section, int i)
+{
+    config->host           = qd_config_item_value_string(qd, section, i, 
"addr");
+    config->port           = qd_config_item_value_string(qd, section, i, 
"port");
+    config->role           = qd_config_item_value_string(qd, section, i, 
"role");
+    config->max_frame_size = qd_config_item_value_int(qd, section, i, 
"max-frame-size");
+    config->sasl_mechanisms =
+        qd_config_item_value_string(qd, section, i, "sasl-mechanisms");
+    config->ssl_enabled =
+        qd_config_item_value_bool(qd, section, i, "ssl-profile");
+    if (config->ssl_enabled) {
+        config->ssl_server = 1;
+        config->ssl_allow_unsecured_client =
+            qd_config_item_value_bool(qd, section, i, "allow-unsecured");
+        config->ssl_certificate_file =
+            qd_config_item_value_string(qd, section, i, "cert-file");
+        config->ssl_private_key_file =
+            qd_config_item_value_string(qd, section, i, "key-file");
+        config->ssl_password =
+            qd_config_item_value_string(qd, section, i, "password");
+        config->ssl_trusted_certificate_db =
+            qd_config_item_value_string(qd, section, i, "cert-db");
+        config->ssl_trusted_certificates =
+            qd_config_item_value_string(qd, section, i, "trusted-certs");
+        config->ssl_require_peer_authentication =
+            qd_config_item_value_bool(qd, section, i, "require-peer-auth");
+    }
+}
+
+
+static void configure_listeners(qd_dispatch_t *qd)
+{
+    int count;
+    qd_connection_manager_t *cm = qd->connection_manager;
+
+    if (!qd->config || !cm) {
+        assert(false);
+        return;
+    }
+
+    count = qd_config_item_count(qd, CONF_LISTENER);
+    for (int i = 0; i < count; i++) {
+        qd_config_listener_t *cl = NEW(qd_config_listener_t);
+        cl->listener = 0;
+        load_server_config(qd, &cl->configuration, CONF_LISTENER, i);
+        DEQ_ITEM_INIT(cl);
+        DEQ_INSERT_TAIL(cm->config_listeners, cl);
+        qd_log(cm->log_source, QD_LOG_INFO, "Configured Listener: %s:%s 
role=%s",
+               cl->configuration.host, cl->configuration.port, 
cl->configuration.role);
+    }
+}
+
+
+static void configure_connectors(qd_dispatch_t *qd)
+{
+    int count;
+    qd_connection_manager_t *cm = qd->connection_manager;
+
+    if (!qd->config || !cm) {
+        assert(false);
+        return;
+    }
+
+    count = qd_config_item_count(qd, CONF_CONNECTOR);
+    for (int i = 0; i < count; i++) {
+        qd_config_connector_t *cc = NEW(qd_config_connector_t);
+        cc->connector = 0;
+        cc->tag       = 0;
+        cc->started   = false;
+        load_server_config(qd, &cc->configuration, CONF_CONNECTOR, i);
+        DEQ_ITEM_INIT(cc);
+        if (strcmp(cc->configuration.role, "on-demand") == 0) {
+            DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
+        } else
+            DEQ_INSERT_TAIL(cm->config_connectors, cc);
+        qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s 
role=%s",
+               cc->configuration.host, cc->configuration.port, 
cc->configuration.role);
+    }
+}
+
+
+qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)
+{
+    qd_connection_manager_t *cm = NEW(qd_connection_manager_t);
+    if (!cm)
+        return 0;
+
+    cm->log_source = qd_log_source("CONN_MGR");
+    cm->server     = qd->server;
+    DEQ_INIT(cm->config_listeners);
+    DEQ_INIT(cm->config_connectors);
+    DEQ_INIT(cm->on_demand_connectors);
+
+    return cm;
+}
+
+
+void qd_connection_manager_free(qd_connection_manager_t *cm)
+{
+}
+
+
+void qd_connection_manager_configure(qd_dispatch_t *qd)
+{
+    configure_listeners(qd);
+    configure_connectors(qd);
+}
+
+
+void qd_connection_manager_start(qd_dispatch_t *qd)
+{
+    qd_config_listener_t  *cl = 
DEQ_HEAD(qd->connection_manager->config_listeners);
+    qd_config_connector_t *cc = 
DEQ_HEAD(qd->connection_manager->config_connectors);
+
+    while (cl) {
+        if (cl->listener == 0)
+            cl->listener = qd_server_listen(qd, &cl->configuration, cl);
+        cl = DEQ_NEXT(cl);
+    }
+
+    while (cc) {
+        if (cc->connector == 0)
+            cc->connector = qd_server_connect(qd, &cc->configuration, cc);
+        cc = DEQ_NEXT(cc);
+    }
+}
+
+
+qd_config_connector_t *qd_connection_manager_find_on_demand(qd_dispatch_t *qd, 
const char *tag)
+{
+    return 0;
+}
+
+
+void qd_connection_manager_start_on_demand(qd_config_connector_t *od)
+{
+}
+
+
+void qd_connection_manager_stop_on_demand(qd_config_connector_t *od)
+{
+}
+
+static void server_schema_handler(void *context, void *cor)
+{
+    qd_agent_value_string(cor, 0, "state");
+    qd_agent_value_string(cor, 0, "container");
+    qd_agent_value_string(cor, 0, "host");
+    qd_agent_value_string(cor, 0, "sasl");
+    qd_agent_value_string(cor, 0, "role");
+    qd_agent_value_string(cor, 0, "dir");
+}
+
+
+static void server_query_handler(void* context, const char *id, void *cor)
+{
+    qd_dispatch_t *qd        = (qd_dispatch_t*) context;
+    qd_server_t   *qd_server = qd->server;
+    sys_mutex_lock(qd_server->lock);
+    const char               *conn_state;
+    const qd_server_config_t *config;
+    const char               *pn_container_name;
+    const char               *direction;
+
+    qd_connection_t *conn = DEQ_HEAD(qd_server->connections);
+    while (conn) {
+        switch (conn->state) {
+        case CONN_STATE_CONNECTING:  conn_state = "Connecting";  break;
+        case CONN_STATE_OPENING:     conn_state = "Opening";     break;
+        case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break;
+        case CONN_STATE_FAILED:      conn_state = "Failed";      break;
+        case CONN_STATE_USER:        conn_state = "User";        break;
+        default:                     conn_state = "undefined";   break;
+        }
+        qd_agent_value_string(cor, "state", conn_state);
+        // get remote container name using proton connection
+        pn_container_name = pn_connection_remote_container(conn->pn_conn);
+        if (pn_container_name)
+            qd_agent_value_string(cor, "container", pn_container_name);
+        else
+            qd_agent_value_null(cor, "container");
+
+        // and now for some config entries
+        if (conn->connector) {
+            config = conn->connector->config;
+            direction = "out";
+            char host[1000];
+            strcpy(host, config->host);
+            strcat(host, ":");
+            strcat(host, config->port);
+            qd_agent_value_string(cor, "host", host);
+        } else {
+            config = conn->listener->config;
+            direction = "in";
+            qd_agent_value_string(cor, "host", 
pn_connector_name(conn->pn_cxtr));
+        }
+
+        qd_agent_value_string(cor, "sasl", config->sasl_mechanisms);
+        qd_agent_value_string(cor, "role", config->role);
+        qd_agent_value_string(cor, "dir",  direction);
+
+        conn = DEQ_NEXT(conn);
+        qd_agent_value_complete(cor, conn != 0);
+    }
+    sys_mutex_unlock(qd_server->lock);
+}
+
+
+void qd_connection_manager_setup_agent(qd_dispatch_t *qd)
+{
+    qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection", qd, 
server_schema_handler, server_query_handler);
+}

Propchange: qpid/dispatch/trunk/src/connection_manager.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/src/dispatch.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Fri Apr  4 20:09:44 2014
@@ -30,7 +30,7 @@
  * Private Function Prototypes
  */
 qd_server_t    *qd_server(int tc, const char *container_name);
-void            qd_server_setup_agent(qd_dispatch_t *qd);
+void            qd_connection_manager_setup_agent(qd_dispatch_t *qd);
 void            qd_server_free(qd_server_t *server);
 qd_container_t *qd_container(qd_dispatch_t *qd);
 void            qd_container_setup_agent(qd_dispatch_t *qd);
@@ -41,13 +41,9 @@ void            qd_router_free(qd_router
 qd_agent_t     *qd_agent(qd_dispatch_t *qd);
 void            qd_agent_free(qd_agent_t *agent);
 
-ALLOC_DEFINE(qd_config_listener_t);
-ALLOC_DEFINE(qd_config_connector_t);
 
 static const char *CONF_CONTAINER   = "container";
 static const char *CONF_ROUTER      = "router";
-static const char *CONF_LISTENER    = "listener";
-static const char *CONF_CONNECTOR   = "connector";
 
 
 qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
@@ -56,9 +52,6 @@ qd_dispatch_t *qd_dispatch(const char *p
 
     memset(qd, 0, sizeof(qd_dispatch_t));
 
-    DEQ_INIT(qd->config_listeners);
-    DEQ_INIT(qd->config_connectors);
-
     // alloc and log has to be initialized before any module.
     qd_alloc_initialize();
     qd_log_initialize(); 
@@ -133,13 +126,14 @@ void qd_dispatch_configure_router(qd_dis
 
 void qd_dispatch_prepare(qd_dispatch_t *qd)
 {
-    qd->server    = qd_server(qd->thread_count, qd->container_name);
-    qd->container = qd_container(qd);
-    qd->router    = qd_router(qd, qd->router_mode, qd->router_area, 
qd->router_id);
-    qd->agent     = qd_agent(qd);
+    qd->server             = qd_server(qd->thread_count, qd->container_name);
+    qd->container          = qd_container(qd);
+    qd->router             = qd_router(qd, qd->router_mode, qd->router_area, 
qd->router_id);
+    qd->agent              = qd_agent(qd);
+    qd->connection_manager = qd_connection_manager(qd);
 
     qd_alloc_setup_agent(qd);
-    qd_server_setup_agent(qd);
+    qd_connection_manager_setup_agent(qd);
     qd_container_setup_agent(qd);
     qd_router_setup_late(qd);
 }
@@ -149,6 +143,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
 {
     qd_config_free(qd->config);
     qd_config_finalize();
+    qd_connection_manager_free(qd->connection_manager);
     qd_agent_free(qd->agent);
     qd_router_free(qd->router);
     qd_container_free(qd->container);
@@ -158,98 +153,9 @@ void qd_dispatch_free(qd_dispatch_t *qd)
 }
 
 
-static void load_server_config(qd_dispatch_t *qd, qd_server_config_t *config, 
const char *section, int i)
-{
-    config->host = qd_config_item_value_string(qd, section, i, "addr");
-    config->port = qd_config_item_value_string(qd, section, i, "port");
-    config->role = qd_config_item_value_string(qd, section, i, "role");
-    config->max_frame_size = qd_config_item_value_int(qd, section, i, 
"max-frame-size");
-    config->sasl_mechanisms =
-        qd_config_item_value_string(qd, section, i, "sasl-mechanisms");
-    config->ssl_enabled =
-        qd_config_item_value_bool(qd, section, i, "ssl-profile");
-    if (config->ssl_enabled) {
-        config->ssl_server = 1;
-        config->ssl_allow_unsecured_client =
-            qd_config_item_value_bool(qd, section, i, "allow-unsecured");
-        config->ssl_certificate_file =
-            qd_config_item_value_string(qd, section, i, "cert-file");
-        config->ssl_private_key_file =
-            qd_config_item_value_string(qd, section, i, "key-file");
-        config->ssl_password =
-            qd_config_item_value_string(qd, section, i, "password");
-        config->ssl_trusted_certificate_db =
-            qd_config_item_value_string(qd, section, i, "cert-db");
-        config->ssl_trusted_certificates =
-            qd_config_item_value_string(qd, section, i, "trusted-certs");
-        config->ssl_require_peer_authentication =
-            qd_config_item_value_bool(qd, section, i, "require-peer-auth");
-    }
-}
-
-
-static void configure_listeners(qd_dispatch_t *qd)
-{
-    int count;
-
-    if (!qd->config)
-        return;
-
-    count = qd_config_item_count(qd, CONF_LISTENER);
-    for (int i = 0; i < count; i++) {
-        qd_config_listener_t *cl = new_qd_config_listener_t();
-        load_server_config(qd, &cl->configuration, CONF_LISTENER, i);
-
-        printf("\nListener   : %s:%s\n", cl->configuration.host, 
cl->configuration.port);
-        printf("       SASL: %s\n", cl->configuration.sasl_mechanisms);
-        printf("        SSL: %d\n", cl->configuration.ssl_enabled);
-        if (cl->configuration.ssl_enabled) {
-            printf("      unsec: %d\n", 
cl->configuration.ssl_allow_unsecured_client);
-            printf("  cert-file: %s\n", 
cl->configuration.ssl_certificate_file);
-            printf("   key-file: %s\n", 
cl->configuration.ssl_private_key_file);
-            printf("    cert-db: %s\n", 
cl->configuration.ssl_trusted_certificate_db);
-            printf("  peer-auth: %d\n", 
cl->configuration.ssl_require_peer_authentication);
-        }
-
-        cl->listener = qd_server_listen(qd, &cl->configuration, cl);
-        DEQ_ITEM_INIT(cl);
-        DEQ_INSERT_TAIL(qd->config_listeners, cl);
-    }
-}
-
-
-static void configure_connectors(qd_dispatch_t *qd)
-{
-    int count;
-
-    if (!qd->config)
-        return;
-
-    count = qd_config_item_count(qd, CONF_CONNECTOR);
-    for (int i = 0; i < count; i++) {
-        qd_config_connector_t *cc = new_qd_config_connector_t();
-        load_server_config(qd, &cc->configuration, CONF_CONNECTOR, i);
-
-        printf("\nConnector  : %s:%s\n", cc->configuration.host, 
cc->configuration.port);
-        printf("       SASL: %s\n", cc->configuration.sasl_mechanisms);
-        printf("        SSL: %d\n", cc->configuration.ssl_enabled);
-        if (cc->configuration.ssl_enabled) {
-            printf("  cert-file: %s\n", 
cc->configuration.ssl_certificate_file);
-            printf("   key-file: %s\n", 
cc->configuration.ssl_private_key_file);
-            printf("    cert-db: %s\n", 
cc->configuration.ssl_trusted_certificate_db);
-            printf("  peer-auth: %d\n", 
cc->configuration.ssl_require_peer_authentication);
-        }
-
-        cc->connector = qd_server_connect(qd, &cc->configuration, cc);
-        DEQ_ITEM_INIT(cc);
-        DEQ_INSERT_TAIL(qd->config_connectors, cc);
-    }
-}
-
-
 void qd_dispatch_post_configure_connections(qd_dispatch_t *qd)
 {
-    configure_listeners(qd);
-    configure_connectors(qd);
+    qd_connection_manager_configure(qd);
+    qd_connection_manager_start(qd);
 }
 

Modified: qpid/dispatch/trunk/src/dispatch_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch_private.h (original)
+++ qpid/dispatch/trunk/src/dispatch_private.h Fri Apr  4 20:09:44 2014
@@ -24,39 +24,19 @@ typedef struct qd_router_t    qd_router_
 typedef struct qd_agent_t     qd_agent_t;
 
 #include <qpid/dispatch/container.h>
+#include <qpid/dispatch/connection_manager.h>
 #include "server_private.h"
 #include "config_private.h"
 #include "router_private.h"
-#include <qpid/dispatch/ctools.h>
 
-typedef struct qd_config_listener_t {
-    DEQ_LINKS(struct qd_config_listener_t);
-    qd_listener_t      *listener;
-    qd_server_config_t  configuration;
-} qd_config_listener_t;
-
-DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t);
-ALLOC_DECLARE(qd_config_listener_t);
-
-
-typedef struct qd_config_connector_t {
-    DEQ_LINKS(struct qd_config_connector_t);
-    qd_connector_t     *connector;
-    qd_server_config_t  configuration;
-} qd_config_connector_t;
-
-DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
-ALLOC_DECLARE(qd_config_connector_t);
 
 struct qd_dispatch_t {
-    qd_server_t        *server;
-    qd_container_t     *container;
-    qd_router_t        *router;
-    qd_agent_t         *agent;
-    qd_config_t        *config;
-
-    qd_config_listener_list_t   config_listeners;
-    qd_config_connector_list_t  config_connectors;
+    qd_server_t             *server;
+    qd_container_t          *container;
+    qd_router_t             *router;
+    qd_agent_t              *agent;
+    qd_config_t             *config;
+    qd_connection_manager_t *connection_manager;
 
     int               thread_count;
     const char       *container_name;

Modified: qpid/dispatch/trunk/src/router_node.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Apr  4 20:09:44 2014
@@ -26,9 +26,10 @@
 #include "dispatch_private.h"
 #include "router_private.h"
 
-static char *router_role   = "inter-router";
-static char *local_prefix  = "_local/";
-static char *topo_prefix   = "_topo/";
+static char *router_role    = "inter-router";
+static char *on_demand_role = "on-demand";
+static char *local_prefix   = "_local/";
+static char *topo_prefix    = "_topo/";
 static char *direct_prefix;
 static char *node_id;
 
@@ -186,6 +187,22 @@ static int qd_router_connection_is_inter
 
 
 /**
+ * Determine whether a connection is configured in the on-demand role.
+ */
+static int qd_router_connection_is_on_demand(const qd_connection_t *conn)
+{
+    if (!conn)
+        return 0;
+
+    const qd_server_config_t *cf = qd_connection_config(conn);
+    if (cf && strcmp(cf->role, on_demand_role) == 0)
+        return 1;
+
+    return 0;
+}
+
+
+/**
  * Determine whether a terminus has router capability
  */
 static int qd_router_terminus_is_router(pn_terminus_t *term)
@@ -1184,14 +1201,19 @@ static void router_outbound_open_handler
     qd_router_t *router = (qd_router_t*) type_context;
 
     //
-    // Check the configured role of this connection.  If it is not the 
inter-router
-    // role, ignore it.
+    // If the connection is on-demand, visit all waypoints that are waiting 
for their
+    // connection to arrive.
     //
-    if (!qd_router_connection_is_inter_router(conn)) {
-        qd_log(router->log_source, QD_LOG_WARNING, "Outbound connection set up 
without inter-router role");
+    if (qd_router_connection_is_on_demand(conn)) {
         return;
     }
 
+    //
+    // If the connection isn't inter-router, ignore it.
+    //
+    if (!qd_router_connection_is_inter_router(conn))
+        return;
+
     qd_link_t        *sender;
     qd_link_t        *receiver;
     qd_router_link_t *rlink;

Modified: qpid/dispatch/trunk/src/server.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Fri Apr  4 20:09:44 2014
@@ -19,7 +19,6 @@
 
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/threading.h>
-#include <qpid/dispatch/agent.h>
 #include <qpid/dispatch/log.h>
 #include "server_private.h"
 #include "timer_private.h"
@@ -32,46 +31,6 @@
 
 static __thread qd_server_t *thread_server = 0;
 
-typedef struct qd_thread_t {
-    qd_server_t  *qd_server;
-    int           thread_id;
-    volatile int  running;
-    volatile int  canceled;
-    int           using_thread;
-    sys_thread_t *thread;
-} qd_thread_t;
-
-
-struct qd_server_t {
-    int                      thread_count;
-    const char              *container_name;
-    pn_driver_t             *driver;
-    qd_log_source_t         *log_source;
-    qd_thread_start_cb_t     start_handler;
-    qd_conn_handler_cb_t     conn_handler;
-    qd_user_fd_handler_cb_t  ufd_handler;
-    void                    *start_context;
-    void                    *conn_handler_context;
-    sys_cond_t              *cond;
-    sys_mutex_t             *lock;
-    qd_thread_t            **threads;
-    work_queue_t            *work_queue;
-    qd_timer_list_t          pending_timers;
-    bool                     a_thread_is_waiting;
-    int                      threads_active;
-    int                      pause_requests;
-    int                      threads_paused;
-    int                      pause_next_sequence;
-    int                      pause_now_serving;
-    qd_signal_handler_cb_t   signal_handler;
-    void                    *signal_context;
-    int                      pending_signal;
-    qd_connection_list_t     connections;
-};
-
-
-
-
 ALLOC_DEFINE(qd_listener_t);
 ALLOC_DEFINE(qd_connector_t);
 ALLOC_DEFINE(qd_connection_t);
@@ -123,7 +82,6 @@ static void thread_process_listeners(qd_
         pn_connection_set_context(conn, ctx);
         ctx->pn_conn = conn;
 
-        qd_log(qd_server->log_source, QD_LOG_DEBUG, "added listener 
connection");
         // qd_server->lock is already locked
         DEQ_INSERT_TAIL(qd_server->connections, ctx);
 
@@ -521,8 +479,6 @@ static void *thread_run(void *arg)
 
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
-                qd_log(qd_server->log_source, QD_LOG_DEBUG, "removed %s 
connection",
-                        ctx->connector ? "connector" : "listener");
                 free_qd_connection_t(ctx);
                 pn_connector_free(work);
                 if (conn)
@@ -617,7 +573,6 @@ static void cxtr_try_open(void *context)
     sys_mutex_lock(ct->server->lock);
     ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, 
ct->config->port, (void*) ctx);
     DEQ_INSERT_TAIL(ct->server->connections, ctx);
-    qd_log(ct->server->log_source, QD_LOG_DEBUG, "added connector connection");
     sys_mutex_unlock(ct->server->lock);
 
     ct->ctx   = ctx;
@@ -1077,71 +1032,3 @@ void qd_server_timer_cancel_LH(qd_timer_
 }
 
 
-static void server_schema_handler(void *context, void *cor)
-{
-    qd_agent_value_string(cor, 0, "state");
-    qd_agent_value_string(cor, 0, "container");
-    qd_agent_value_string(cor, 0, "host");
-    qd_agent_value_string(cor, 0, "sasl");
-    qd_agent_value_string(cor, 0, "role");
-    qd_agent_value_string(cor, 0, "dir");
-}
-
-
-static void server_query_handler(void* context, const char *id, void *cor)
-{
-    qd_server_t *qd_server = (qd_server_t*) context;
-    sys_mutex_lock(qd_server->lock);
-    const char               *conn_state;
-    const qd_server_config_t *config;
-    const char               *pn_container_name;
-    const char               *direction;
-
-    qd_connection_t *conn = DEQ_HEAD(qd_server->connections);
-    while (conn) {
-        switch (conn->state) {
-        case CONN_STATE_CONNECTING:  conn_state = "Connecting";  break;
-        case CONN_STATE_OPENING:     conn_state = "Opening";     break;
-        case CONN_STATE_OPERATIONAL: conn_state = "Operational"; break;
-        case CONN_STATE_FAILED:      conn_state = "Failed";      break;
-        case CONN_STATE_USER:        conn_state = "User";        break;
-        default:                     conn_state = "undefined";   break;
-        }
-        qd_agent_value_string(cor, "state", conn_state);
-        // get remote container name using proton connection
-        pn_container_name = pn_connection_remote_container(conn->pn_conn);
-        if (pn_container_name)
-            qd_agent_value_string(cor, "container", pn_container_name);
-        else
-            qd_agent_value_null(cor, "container");
-
-        // and now for some config entries
-        if (conn->connector) {
-            config = conn->connector->config;
-            direction = "out";
-            char host[1000];
-            strcpy(host, config->host);
-            strcat(host, ":");
-            strcat(host, config->port);
-            qd_agent_value_string(cor, "host", host);
-        } else {
-            config = conn->listener->config;
-            direction = "in";
-            qd_agent_value_string(cor, "host", 
pn_connector_name(conn->pn_cxtr));
-        }
-
-        qd_agent_value_string(cor, "sasl", config->sasl_mechanisms);
-        qd_agent_value_string(cor, "role", config->role);
-        qd_agent_value_string(cor, "dir",  direction);
-
-        conn = DEQ_NEXT(conn);
-        qd_agent_value_complete(cor, conn != 0);
-    }
-    sys_mutex_unlock(qd_server->lock);
-}
-
-
-void qd_server_setup_agent(qd_dispatch_t *qd)
-{
-    qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection", 
qd->server, server_schema_handler, server_query_handler);
-}

Modified: qpid/dispatch/trunk/src/server_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Fri Apr  4 20:09:44 2014
@@ -21,13 +21,18 @@
 
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/user_fd.h>
-#include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include "work_queue.h"
 #include <proton/driver.h>
 #include <proton/engine.h>
 #include <proton/driver_extras.h>
 
+typedef struct qd_server_t qd_server_t;
+
+#include "timer_private.h"
+
 void qd_server_timer_pending_LH(qd_timer_t *timer);
 void qd_server_timer_cancel_LH(qd_timer_t *timer);
 
@@ -85,6 +90,8 @@ struct qd_connection_t {
     qd_user_fd_t    *ufd;
 };
 
+DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
+
 
 struct qd_user_fd_t {
     qd_server_t    *server;
@@ -94,11 +101,47 @@ struct qd_user_fd_t {
 };
 
 
+typedef struct qd_thread_t {
+    qd_server_t  *qd_server;
+    int           thread_id;
+    volatile int  running;
+    volatile int  canceled;
+    int           using_thread;
+    sys_thread_t *thread;
+} qd_thread_t;
+
+
+struct qd_server_t {
+    int                      thread_count;
+    const char              *container_name;
+    pn_driver_t             *driver;
+    qd_log_source_t         *log_source;
+    qd_thread_start_cb_t     start_handler;
+    qd_conn_handler_cb_t     conn_handler;
+    qd_user_fd_handler_cb_t  ufd_handler;
+    void                    *start_context;
+    void                    *conn_handler_context;
+    sys_cond_t              *cond;
+    sys_mutex_t             *lock;
+    qd_thread_t            **threads;
+    work_queue_t            *work_queue;
+    qd_timer_list_t          pending_timers;
+    bool                     a_thread_is_waiting;
+    int                      threads_active;
+    int                      pause_requests;
+    int                      threads_paused;
+    int                      pause_next_sequence;
+    int                      pause_now_serving;
+    qd_signal_handler_cb_t   signal_handler;
+    void                    *signal_context;
+    int                      pending_signal;
+    qd_connection_list_t     connections;
+};
+
+
 ALLOC_DECLARE(qd_listener_t);
 ALLOC_DECLARE(qd_connector_t);
 ALLOC_DECLARE(qd_connection_t);
 ALLOC_DECLARE(qd_user_fd_t);
 
-DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
-
 #endif

Modified: qpid/dispatch/trunk/src/timer_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/timer_private.h?rev=1584881&r1=1584880&r2=1584881&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/timer_private.h (original)
+++ qpid/dispatch/trunk/src/timer_private.h Fri Apr  4 20:09:44 2014
@@ -22,7 +22,8 @@
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/threading.h>
-#include "server_private.h"
+
+typedef struct qd_server_t qd_server_t;
 
 typedef enum {
     TIMER_FREE,



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to