Author: tross
Date: Wed May 21 20:25:40 2014
New Revision: 1596672
URL: http://svn.apache.org/r1596672
Log:
DISPATCH-31 - Cleaned up memory leaks not related to the embedded python.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/alloc.h
qpid/dispatch/trunk/include/qpid/dispatch/config.h
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/include/qpid/dispatch/server.h
qpid/dispatch/trunk/src/agent.c
qpid/dispatch/trunk/src/alloc.c
qpid/dispatch/trunk/src/alloc_private.h
qpid/dispatch/trunk/src/config.c
qpid/dispatch/trunk/src/connection_manager.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/dispatch.c
qpid/dispatch/trunk/src/dispatch_private.h
qpid/dispatch/trunk/src/hash.c
qpid/dispatch/trunk/src/log.c
qpid/dispatch/trunk/src/python_embedded.c
qpid/dispatch/trunk/src/router_config.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/tests/alloc_test.c
qpid/dispatch/trunk/tests/run_unit_tests.c
qpid/dispatch/trunk/tests/server_test.c
qpid/dispatch/trunk/tests/timer_test.c
Modified: qpid/dispatch/trunk/include/qpid/dispatch/alloc.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/alloc.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/alloc.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/alloc.h Wed May 21 20:25:40 2014
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <stdint.h>
#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/ctools.h>
/**
* @file
@@ -34,6 +35,8 @@
typedef struct qd_alloc_pool_t qd_alloc_pool_t;
+DEQ_DECLARE(qd_alloc_pool_t, qd_alloc_pool_list_t);
+
typedef struct {
int transfer_batch_size;
int local_free_list_max;
@@ -49,16 +52,17 @@ typedef struct {
} qd_alloc_stats_t;
typedef struct {
- uint32_t header;
- char *type_name;
- size_t type_size;
- size_t *additional_size;
- size_t total_size;
- qd_alloc_config_t *config;
- qd_alloc_stats_t *stats;
- qd_alloc_pool_t *global_pool;
- sys_mutex_t *lock;
- uint32_t trailer;
+ uint32_t header;
+ char *type_name;
+ size_t type_size;
+ size_t *additional_size;
+ size_t total_size;
+ qd_alloc_config_t *config;
+ qd_alloc_stats_t *stats;
+ qd_alloc_pool_t *global_pool;
+ sys_mutex_t *lock;
+ qd_alloc_pool_list_t tpool_list;
+ uint32_t trailer;
} qd_alloc_type_desc_t;
/** Allocate in a thread pool. Use via ALLOC_DECLARE */
@@ -70,12 +74,13 @@ void qd_dealloc(qd_alloc_type_desc_t *de
* Declare functions new_T and alloc_T
*/
#define ALLOC_DECLARE(T) \
+ extern __thread qd_alloc_pool_t *__local_pool_##T; \
T *new_##T(void); \
void free_##T(T *p)
#define ALLOC_DEFINE_CONFIG(T,S,A,C) \
- qd_alloc_type_desc_t __desc_##T = {0, #T, S, A, 0, C, 0, 0, 0, 0}; \
- __thread qd_alloc_pool_t *__local_pool_##T = 0; \
+ qd_alloc_type_desc_t __desc_##T = {0, #T, S, A, 0, C, 0, 0, 0, {0,0}, 0}; \
+ __thread qd_alloc_pool_t *__local_pool_##T = 0; \
T *new_##T(void) { return (T*) qd_alloc(&__desc_##T, &__local_pool_##T); }
\
void free_##T(T *p) { qd_dealloc(&__desc_##T, &__local_pool_##T, (void*)
p); } \
qd_alloc_stats_t *alloc_stats_##T(void) { return __desc_##T.stats; }
Modified: qpid/dispatch/trunk/include/qpid/dispatch/config.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/config.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/config.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/config.h Wed May 21 20:25:40 2014
@@ -32,7 +32,7 @@
void qd_log_configure(const qd_dispatch_t *dispatch);
int qd_config_item_count(const qd_dispatch_t *dispatch, const char *section);
bool qd_config_item_exists(const qd_dispatch_t *dispatch, const char *section,
int index, const char* key);
-const char *qd_config_item_value_string(const qd_dispatch_t *dispatch, const
char *section, int index, const char* key);
+char *qd_config_item_value_string(const qd_dispatch_t *dispatch, const char
*section, int index, const char* key);
uint32_t qd_config_item_value_int(const qd_dispatch_t *dispatch, const char
*section, int index, const char* key);
int qd_config_item_value_bool(const qd_dispatch_t *dispatch, const char
*section, int index, const char* key);
///@}
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Wed May 21 20:25:40
2014
@@ -140,6 +140,7 @@ qd_dist_mode_t qd_container_node_get_dis
qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node);
qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir,
const char *name);
+void qd_link_free(qd_link_t *link);
/**
* Context associated with the link for storing link-specific state.
Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Wed May 21 20:25:40 2014
@@ -219,29 +219,29 @@ typedef struct qd_server_config_t {
/**
* Host name or network address to bind to a listener or use in the
connector.
*/
- const char *host;
+ char *host;
/**
* Port name or number to bind to a listener or use in the connector.
*/
- const char *port;
+ char *port;
/**
* Space-separated list of SASL mechanisms to be accepted for the
connection.
*/
- const char *sasl_mechanisms;
+ char *sasl_mechanisms;
/**
* If appropriate for the mechanism, the username for authentication
* (connector only)
*/
- const char *sasl_username;
+ char *sasl_username;
/**
* If appropriate for the mechanism, the password for authentication
* (connector only)
*/
- const char *sasl_password;
+ char *sasl_password;
/**
* If appropriate for the mechanism, the minimum acceptable security
strength factor
@@ -274,23 +274,23 @@ typedef struct qd_server_config_t {
* Path to the file containing the PEM-formatted public certificate for
the local end
* of the connection.
*/
- const char *ssl_certificate_file;
+ char *ssl_certificate_file;
/**
* Path to the file containing the PEM-formatted private key for the local
end of the
* connection.
*/
- const char *ssl_private_key_file;
+ char *ssl_private_key_file;
/**
* The password used to sign the private key, or NULL if the key is not
protected.
*/
- const char *ssl_password;
+ char *ssl_password;
/**
* Path to the file containing the PEM-formatted set of certificates of
trusted CAs.
*/
- const char *ssl_trusted_certificate_db;
+ char *ssl_trusted_certificate_db;
/**
* Path to an optional file containing the PEM-formatted set of
certificates of
@@ -298,7 +298,7 @@ typedef struct qd_server_config_t {
* set of certificates in the ssl_trusted_certificate_db. If this is left
NULL,
* the entire set within the db will be used.
*/
- const char *ssl_trusted_certificates;
+ char *ssl_trusted_certificates;
/**
* Iff non-zero, require that the peer's certificate be supplied and that
it be authentic
@@ -316,7 +316,7 @@ typedef struct qd_server_config_t {
* The specified role of the connection. This can be used to control the
behavior and
* capabilities of the connections.
*/
- const char *role;
+ char *role;
/**
* The maximum size of an AMQP frame in octets.
@@ -452,7 +452,7 @@ qd_listener_t *qd_server_listen(qd_dispa
*
* @param li A listener pointer returned by qd_listen.
*/
-void qd_listener_free(qd_listener_t* li);
+void qd_server_listener_free(qd_listener_t* li);
/**
@@ -460,7 +460,7 @@ void qd_listener_free(qd_listener_t* li)
*
* @param li A listener pointer returned by qd_listen.
*/
-void qd_listener_close(qd_listener_t* li);
+void qd_server_listener_close(qd_listener_t* li);
/**
@@ -481,7 +481,7 @@ qd_connector_t *qd_server_connect(qd_dis
*
* @param ct A connector pointer returned by qd_connect.
*/
-void qd_connector_free(qd_connector_t* ct);
+void qd_server_connector_free(qd_connector_t* ct);
/**
* @}
Modified: qpid/dispatch/trunk/src/agent.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/agent.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/agent.c (original)
+++ qpid/dispatch/trunk/src/agent.c Wed May 21 20:25:40 2014
@@ -753,6 +753,14 @@ qd_agent_t *qd_agent(qd_dispatch_t *qd)
void qd_agent_free(qd_agent_t *agent)
{
+ qd_agent_class_t *cls = DEQ_HEAD(agent->class_list);
+ while (cls) {
+ DEQ_REMOVE_HEAD(agent->class_list);
+ qd_hash_handle_free(cls->hash_handle);
+ free(cls);
+ cls = DEQ_HEAD(agent->class_list);
+ }
+
qd_router_unregister_address(agent->local_address);
qd_router_unregister_address(agent->global_address);
sys_mutex_free(agent->lock);
Modified: qpid/dispatch/trunk/src/alloc.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc.c (original)
+++ qpid/dispatch/trunk/src/alloc.c Wed May 21 20:25:40 2014
@@ -51,6 +51,7 @@ DEQ_DECLARE(qd_alloc_item_t, qd_alloc_it
struct qd_alloc_pool_t {
+ DEQ_LINKS(qd_alloc_pool_t);
qd_alloc_item_list_t free_list;
};
@@ -82,6 +83,7 @@ static void qd_alloc_init(qd_alloc_type_
desc->global_pool = NEW(qd_alloc_pool_t);
DEQ_INIT(desc->global_pool->free_list);
desc->lock = sys_mutex();
+ DEQ_INIT(desc->tpool_list);
desc->stats = NEW(qd_alloc_stats_t);
memset(desc->stats, 0, sizeof(qd_alloc_stats_t));
@@ -114,7 +116,11 @@ void *qd_alloc(qd_alloc_type_desc_t *des
//
if (*tpool == 0) {
*tpool = NEW(qd_alloc_pool_t);
+ DEQ_ITEM_INIT(*tpool);
DEQ_INIT((*tpool)->free_list);
+ sys_mutex_lock(desc->lock);
+ DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
+ sys_mutex_unlock(desc->lock);
}
qd_alloc_pool_t *pool = *tpool;
@@ -128,7 +134,7 @@ void *qd_alloc(qd_alloc_type_desc_t *des
if (item) {
DEQ_REMOVE_HEAD(pool->free_list);
#ifdef QD_MEMORY_DEBUG
- item->desc = desc;
+ item->desc = desc;
item->header = PATTERN_FRONT;
*((uint32_t*) ((void*) &item[1] + desc->total_size))= PATTERN_BACK;
#endif
@@ -197,7 +203,7 @@ void qd_dealloc(qd_alloc_type_desc_t *de
assert (desc->trailer == PATTERN_BACK);
assert (item->header == PATTERN_FRONT);
assert (*((uint32_t*) (p + desc->total_size)) == PATTERN_BACK);
- assert (item->desc == desc);
+ assert (item->desc == desc); // Check for double-free
item->desc = 0;
#endif
@@ -207,7 +213,11 @@ void qd_dealloc(qd_alloc_type_desc_t *de
//
if (*tpool == 0) {
*tpool = NEW(qd_alloc_pool_t);
+ DEQ_ITEM_INIT(*tpool);
DEQ_INIT((*tpool)->free_list);
+ sys_mutex_lock(desc->lock);
+ DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
+ sys_mutex_unlock(desc->lock);
}
qd_alloc_pool_t *pool = *tpool;
@@ -254,6 +264,83 @@ void qd_alloc_initialize(void)
}
+void qd_alloc_finalize(void)
+{
+ //
+ // Note that the logging facility is already finalized by the time this is
called.
+ // We will use fprintf(stderr, ...) for logging.
+ //
+ // The assumption coming into this finalizer is that all allocations have
been
+ // released. Any non-released objects shall be flagged.
+ //
+
+ //
+ // Note: By the time we get here, the server threads have been joined and
there is
+ // only the main thread remaining. There is therefore no reason to
be
+ // concerned about locking.
+ //
+
+ qd_alloc_item_t *item;
+ qd_alloc_type_t *type_item = DEQ_HEAD(type_list);
+ while (type_item) {
+ qd_alloc_type_desc_t *desc = type_item->desc;
+
+ //
+ // Reclaim the items on the global free pool
+ //
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ while (item) {
+ DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ free(item);
+ desc->stats->total_free_to_heap++;
+ item = DEQ_HEAD(desc->global_pool->free_list);
+ }
+ free(desc->global_pool);
+ desc->global_pool = 0;
+
+ //
+ // Reclaim the items on thread pools
+ //
+ qd_alloc_pool_t *tpool = DEQ_HEAD(desc->tpool_list);
+ while (tpool) {
+ item = DEQ_HEAD(tpool->free_list);
+ while (item) {
+ DEQ_REMOVE_HEAD(tpool->free_list);
+ free(item);
+ desc->stats->total_free_to_heap++;
+ item = DEQ_HEAD(tpool->free_list);
+ }
+
+ DEQ_REMOVE_HEAD(desc->tpool_list);
+ free(tpool);
+ tpool = DEQ_HEAD(desc->tpool_list);
+ }
+
+ //
+ // Check the stats for lost items
+ //
+ if (desc->stats->total_free_to_heap <
desc->stats->total_alloc_from_heap)
+ fprintf(stderr, "alloc.c: Items of type '%s' remain allocated at
shutdown: %ld\n",
+ desc->type_name,
+ desc->stats->total_alloc_from_heap -
desc->stats->total_free_to_heap);
+
+ //
+ // Reclaim the descriptor components
+ //
+ free(desc->stats);
+ sys_mutex_free(desc->lock);
+ desc->lock = 0;
+ desc->trailer = 0;
+
+ DEQ_REMOVE_HEAD(type_list);
+ free(type_item);
+ type_item = DEQ_HEAD(type_list);
+ }
+
+ sys_mutex_free(init_lock);
+}
+
+
static void alloc_attr_name(void *object_handle, void *cor, void *unused)
{
qd_alloc_type_t *item = (qd_alloc_type_t*) object_handle;
Modified: qpid/dispatch/trunk/src/alloc_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc_private.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc_private.h (original)
+++ qpid/dispatch/trunk/src/alloc_private.h Wed May 21 20:25:40 2014
@@ -23,6 +23,7 @@
#include <qpid/dispatch/dispatch.h>
void qd_alloc_initialize(void);
+void qd_alloc_finalize(void);
void qd_alloc_setup_agent(qd_dispatch_t *qd);
#endif
Modified: qpid/dispatch/trunk/src/config.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/config.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/config.c (original)
+++ qpid/dispatch/trunk/src/config.c Wed May 21 20:25:40 2014
@@ -141,6 +141,7 @@ void qd_config_free(qd_config_t *config)
if (config) {
Py_DECREF(config->pClass);
Py_DECREF(config->pModule);
+ Py_DECREF(config->pObject);
free_qd_config_t(config);
}
}
@@ -230,7 +231,7 @@ bool qd_config_item_exists(const qd_disp
return exists;
}
-const char *qd_config_item_value_string(const qd_dispatch_t *dispatch, const
char *section, int index, const char* key)
+char *qd_config_item_value_string(const qd_dispatch_t *dispatch, const char
*section, int index, const char* key)
{
PyObject *pResult = item_value(dispatch, section, index, key,
"value_string");
char *value = 0;
Modified: qpid/dispatch/trunk/src/connection_manager.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Wed May 21 20:25:40 2014
@@ -164,8 +164,51 @@ qd_connection_manager_t *qd_connection_m
}
+static void qd_connection_manager_config_free(qd_server_config_t *cf)
+{
+ free(cf->host);
+ free(cf->port);
+ free(cf->role);
+ free(cf->sasl_mechanisms);
+ if (cf->ssl_enabled) {
+ free(cf->ssl_certificate_file);
+ free(cf->ssl_private_key_file);
+ free(cf->ssl_password);
+ free(cf->ssl_trusted_certificate_db);
+ free(cf->ssl_trusted_certificates);
+ }
+}
+
+
void qd_connection_manager_free(qd_connection_manager_t *cm)
{
+ qd_config_listener_t *cl = DEQ_HEAD(cm->config_listeners);
+ while (cl) {
+ DEQ_REMOVE_HEAD(cm->config_listeners);
+ qd_server_listener_free(cl->listener);
+ qd_connection_manager_config_free(&cl->configuration);
+ free(cl);
+ cl = DEQ_HEAD(cm->config_listeners);
+ }
+
+ qd_config_connector_t *cc = DEQ_HEAD(cm->config_connectors);
+ while(cc) {
+ DEQ_REMOVE_HEAD(cm->config_connectors);
+ qd_server_connector_free(cc->connector);
+ qd_connection_manager_config_free(&cc->configuration);
+ free(cc);
+ cc = DEQ_HEAD(cm->config_connectors);
+ }
+
+ qd_config_connector_t *odc = DEQ_HEAD(cm->on_demand_connectors);
+ while(odc) {
+ DEQ_REMOVE_HEAD(cm->on_demand_connectors);
+ if (odc->connector)
+ qd_server_connector_free(odc->connector);
+ qd_connection_manager_config_free(&odc->configuration);
+ free(odc);
+ odc = DEQ_HEAD(cm->on_demand_connectors);
+ }
}
Modified: qpid/dispatch/trunk/src/container.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Wed May 21 20:25:40 2014
@@ -37,6 +37,7 @@
/** Instance of a node type in a container */
struct qd_node_t {
+ DEQ_LINKS(qd_node_t);
qd_container_t *container;
const qd_node_type_t *ntype; ///< Type of node, defines callbacks.
char *name;
@@ -45,16 +46,18 @@ struct qd_node_t {
qd_lifetime_policy_t life_policy;
};
+DEQ_DECLARE(qd_node_t, qd_node_list_t);
ALLOC_DECLARE(qd_node_t);
ALLOC_DEFINE(qd_node_t);
ALLOC_DEFINE(qd_link_item_t);
/** Encapsulates a proton link for sending and receiving messages */
struct qd_link_t {
- pn_link_t *pn_link;
- void *context;
- qd_node_t *node;
- bool drain_mode;
+ pn_session_t *pn_sess;
+ pn_link_t *pn_link;
+ void *context;
+ qd_node_t *node;
+ bool drain_mode;
};
ALLOC_DECLARE(qd_link_t);
@@ -87,6 +90,7 @@ struct qd_container_t {
qd_server_t *server;
qd_hash_t *node_type_map;
qd_hash_t *node_map;
+ qd_node_list_t nodes;
sys_mutex_t *lock;
qd_node_t *default_node;
qdc_node_type_list_t node_type_list;
@@ -124,6 +128,7 @@ static void setup_outgoing_link(qd_conta
return;
}
+ link->pn_sess = pn_link_session(pn_link);
link->pn_link = pn_link;
link->context = 0;
link->node = node;
@@ -166,6 +171,7 @@ static void setup_incoming_link(qd_conta
return;
}
+ link->pn_sess = pn_link_session(pn_link);
link->pn_link = pn_link;
link->context = 0;
link->node = node;
@@ -418,6 +424,7 @@ qd_container_t *qd_container(qd_dispatch
container->node_map = qd_hash(10, 32, 0); // 1K buckets, item batches
of 32
container->lock = sys_mutex();
container->default_node = 0;
+ DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);
qd_log(container->log_source, QD_LOG_TRACE, "Container Initializing");
@@ -434,8 +441,23 @@ void qd_container_setup_agent(qd_dispatc
void qd_container_free(qd_container_t *container)
{
- // TODO - Free the nodes
- // TODO - Free the node types
+ if (container->default_node)
+ qd_container_destroy_node(container->default_node);
+
+ qd_node_t *node = DEQ_HEAD(container->nodes);
+ while (node) {
+ qd_container_destroy_node(node);
+ node = DEQ_HEAD(container->nodes);
+ }
+
+ qdc_node_type_t *nt = DEQ_HEAD(container->node_type_list);
+ while (nt) {
+ DEQ_REMOVE_HEAD(container->node_type_list);
+ free(nt);
+ nt = DEQ_HEAD(container->node_type_list);
+ }
+ qd_hash_free(container->node_map);
+ qd_hash_free(container->node_type_map);
sys_mutex_free(container->lock);
free(container);
}
@@ -500,6 +522,7 @@ qd_node_t *qd_container_create_node(qd_d
if (!node)
return 0;
+ DEQ_ITEM_INIT(node);
node->container = container;
node->ntype = nt;
node->name = 0;
@@ -511,6 +534,8 @@ qd_node_t *qd_container_create_node(qd_d
qd_field_iterator_t *iter = qd_field_iterator_string(name,
ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
result = qd_hash_insert(container->node_map, iter, node, 0);
+ if (result >= 0)
+ DEQ_INSERT_HEAD(container->nodes, node);
sys_mutex_unlock(container->lock);
qd_field_iterator_free(iter);
if (result < 0) {
@@ -537,6 +562,7 @@ void qd_container_destroy_node(qd_node_t
qd_field_iterator_t *iter = qd_field_iterator_string(node->name,
ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
qd_hash_remove(container->node_map, iter);
+ DEQ_REMOVE(container->nodes, node);
sys_mutex_unlock(container->lock);
qd_field_iterator_free(iter);
free(node->name);
@@ -566,15 +592,15 @@ qd_lifetime_policy_t qd_container_node_g
qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir,
const char* name)
{
- pn_session_t *sess = pn_session(qd_connection_pn(conn));
- qd_link_t *link = new_qd_link_t();
+ qd_link_t *link = new_qd_link_t();
- pn_session_set_incoming_capacity(sess, 1000000);
+ link->pn_sess = pn_session(qd_connection_pn(conn));
+ pn_session_set_incoming_capacity(link->pn_sess, 1000000);
if (dir == QD_OUTGOING)
- link->pn_link = pn_sender(sess, name);
+ link->pn_link = pn_sender(link->pn_sess, name);
else
- link->pn_link = pn_receiver(sess, name);
+ link->pn_link = pn_receiver(link->pn_sess, name);
link->context = node->context;
link->node = node;
@@ -582,12 +608,18 @@ qd_link_t *qd_link(qd_node_t *node, qd_c
pn_link_set_context(link->pn_link, link);
- pn_session_open(sess);
+ pn_session_open(link->pn_sess);
return link;
}
+void qd_link_free(qd_link_t *link)
+{
+ free_qd_link_t(link);
+}
+
+
void qd_link_set_context(qd_link_t *link, void *context)
{
link->context = context;
Modified: qpid/dispatch/trunk/src/dispatch.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Wed May 21 20:25:40 2014
@@ -58,8 +58,8 @@ qd_dispatch_t *qd_dispatch(const char *p
qd_alloc_initialize();
qd_log_initialize();
- qd->router_area = "0";
- qd->router_id = "0";
+ qd->router_area = strdup("0");
+ qd->router_id = strdup("0");
qd->router_mode = QD_ROUTER_MODE_ENDPOINT;
qd_python_initialize(qd, python_pkgdir);
@@ -103,7 +103,7 @@ void qd_dispatch_configure_container(qd_
void qd_dispatch_configure_router(qd_dispatch_t *qd)
{
- const char *router_mode_str = 0;
+ char *router_mode_str = 0;
if (qd->config) {
int count = qd_config_item_count(qd, CONF_ROUTER);
@@ -124,6 +124,8 @@ void qd_dispatch_configure_router(qd_dis
if (!qd->router_id)
qd->router_id = qd->container_name;
+
+ free(router_mode_str);
}
@@ -144,6 +146,8 @@ void qd_dispatch_prepare(qd_dispatch_t *
void qd_dispatch_free(qd_dispatch_t *qd)
{
+ free(qd->router_id);
+ free(qd->router_area);
qd_config_free(qd->config);
qd_config_finalize();
qd_connection_manager_free(qd->connection_manager);
@@ -152,6 +156,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
qd_container_free(qd->container);
qd_server_free(qd->server);
qd_log_finalize();
+ qd_alloc_finalize();
qd_python_finalize();
}
Modified: qpid/dispatch/trunk/src/dispatch_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch_private.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch_private.h (original)
+++ qpid/dispatch/trunk/src/dispatch_private.h Wed May 21 20:25:40 2014
@@ -54,10 +54,10 @@ struct qd_dispatch_t {
qd_config_t *config;
qd_connection_manager_t *connection_manager;
- int thread_count;
- const char *container_name;
- const char *router_area;
- const char *router_id;
+ int thread_count;
+ char *container_name;
+ char *router_area;
+ char *router_id;
qd_router_mode_t router_mode;
};
Modified: qpid/dispatch/trunk/src/hash.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/hash.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/hash.c (original)
+++ qpid/dispatch/trunk/src/hash.c Wed May 21 20:25:40 2014
@@ -101,7 +101,20 @@ qd_hash_t *qd_hash(int bucket_exponent,
void qd_hash_free(qd_hash_t *h)
{
- // TODO - Implement this
+ qd_hash_item_t *item;
+ int idx;
+
+ for (idx = 0; idx < h->bucket_count; idx++) {
+ item = DEQ_HEAD(h->buckets[idx].items);
+ while (item) {
+ free(item->key);
+ free_qd_hash_item_t(item);
+ DEQ_REMOVE_HEAD(h->buckets[idx].items);
+ item = DEQ_HEAD(h->buckets[idx].items);
+ }
+ }
+ free(h->buckets);
+ free(h);
}
Modified: qpid/dispatch/trunk/src/log.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log.c (original)
+++ qpid/dispatch/trunk/src/log.c Wed May 21 20:25:40 2014
@@ -40,7 +40,7 @@ struct qd_log_entry_t {
DEQ_LINKS(qd_log_entry_t);
const char *module;
int level;
- const char *file;
+ char *file;
int line;
time_t time;
char text[TEXT_MAX];
@@ -175,7 +175,7 @@ void qd_log_impl(qd_log_source_t *source
qd_log_entry_t *entry = new_qd_log_entry_t();
DEQ_ITEM_INIT(entry);
entry->module = source->module;
- entry->level = level;
+ entry->level = level;
entry->file = file ? strdup(file) : 0;
entry->line = line;
time(&entry->time);
@@ -195,6 +195,7 @@ void qd_log_impl(qd_log_source_t *source
if (DEQ_SIZE(entries) > LIST_MAX) {
entry = DEQ_HEAD(entries);
DEQ_REMOVE_HEAD(entries);
+ free(entry->file);
free_qd_log_entry_t(entry);
}
sys_mutex_unlock(log_lock);
@@ -222,8 +223,14 @@ void qd_log_initialize(void)
void qd_log_finalize(void) {
for (qd_log_source_t *src = DEQ_HEAD(source_list); src != 0; src =
DEQ_HEAD(source_list)) {
- DEQ_REMOVE_HEAD(source_list);
- qd_log_source_free(src);
+ DEQ_REMOVE_HEAD(source_list);
+ qd_log_source_free(src);
+ }
+
+ for (qd_log_entry_t *entry = DEQ_HEAD(entries); entry; entry =
DEQ_HEAD(entries)) {
+ DEQ_REMOVE_HEAD(entries);
+ free(entry->file);
+ free_qd_log_entry_t(entry);
}
}
Modified: qpid/dispatch/trunk/src/python_embedded.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Wed May 21 20:25:40 2014
@@ -79,6 +79,7 @@ void qd_python_stop(void)
if (ref_count == 0) {
Py_DECREF(dispatch_module);
dispatch_module = 0;
+ PyGC_Collect();
Py_Finalize();
qd_log(log_source, QD_LOG_TRACE, "Embedded Python Interpreter Shut
Down");
}
Modified: qpid/dispatch/trunk/src/router_config.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Wed May 21 20:25:40 2014
@@ -33,13 +33,16 @@ static void qd_router_configure_addresse
int count = qd_config_item_count(router->qd, CONF_ADDRESS);
for (int idx = 0; idx < count; idx++) {
- const char *prefix = qd_config_item_value_string(router->qd,
CONF_ADDRESS, idx, "prefix");
- int phase = qd_config_item_value_int(router->qd,
CONF_ADDRESS, idx, "phase");
- const char *fanout = qd_config_item_value_string(router->qd,
CONF_ADDRESS, idx, "fanout");
- const char *bias = qd_config_item_value_string(router->qd,
CONF_ADDRESS, idx, "bias");
+ char *prefix = qd_config_item_value_string(router->qd, CONF_ADDRESS,
idx, "prefix");
+ int phase = qd_config_item_value_int(router->qd, CONF_ADDRESS,
idx, "phase");
+ char *fanout = qd_config_item_value_string(router->qd, CONF_ADDRESS,
idx, "fanout");
+ char *bias = qd_config_item_value_string(router->qd, CONF_ADDRESS,
idx, "bias");
if (phase < 0 || phase > 9) {
qd_log(router->log_source, QD_LOG_ERROR, "Phase for prefix '%s'
must be between 0 and 9. Ignoring", prefix);
+ free(prefix);
+ free(fanout);
+ free(bias);
continue;
}
@@ -98,6 +101,10 @@ static void qd_router_configure_addresse
addr_phase->semantics = semantics;
addr->last_phase = addr_phase->phase;
DEQ_INSERT_TAIL(addr->phases, addr_phase);
+
+ free(prefix);
+ free(fanout);
+ free(bias);
}
}
@@ -107,13 +114,15 @@ static void qd_router_configure_waypoint
int count = qd_config_item_count(router->qd, CONF_WAYPOINT);
for (int idx = 0; idx < count; idx++) {
- const char *name = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "name");
- int in_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "in-phase");
- int out_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "out-phase");
- const char *connector = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "connector");
+ char *name = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "name");
+ int in_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "in-phase");
+ int out_phase = qd_config_item_value_int(router->qd,
CONF_WAYPOINT, idx, "out-phase");
+ char *connector = qd_config_item_value_string(router->qd,
CONF_WAYPOINT, idx, "connector");
if (in_phase > 9 || out_phase > 9) {
qd_log(router->log_source, QD_LOG_ERROR, "Phases for waypoint '%s'
must be between 0 and 9. Ignoring", name);
+ free(name);
+ free(connector);
continue;
}
@@ -147,6 +156,27 @@ void qd_router_configure(qd_router_t *ro
}
+void qd_router_configure_free(qd_router_t *router)
+{
+ for (qd_config_address_t *ca = DEQ_HEAD(router->config_addrs); ca; ca =
DEQ_HEAD(router->config_addrs)) {
+ for (qd_config_phase_t *ap = DEQ_HEAD(ca->phases); ap; ap =
DEQ_HEAD(ca->phases)) {
+ DEQ_REMOVE_HEAD(ca->phases);
+ free(ap);
+ }
+ free(ca->prefix);
+ DEQ_REMOVE_HEAD(router->config_addrs);
+ free(ca);
+ }
+
+ for (qd_waypoint_t *wp = DEQ_HEAD(router->waypoints); wp; wp =
DEQ_HEAD(router->waypoints)) {
+ DEQ_REMOVE_HEAD(router->waypoints);
+ free(wp->name);
+ free(wp->connector_name);
+ free(wp);
+ }
+}
+
+
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router,
qd_field_iterator_t *iter,
char in_phase, char
*out_phase)
{
Modified: qpid/dispatch/trunk/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Wed May 21 20:25:40 2014
@@ -1233,6 +1233,7 @@ static int router_link_detach_handler(vo
if (rlink->target)
free(rlink->target);
free_qd_router_link_t(rlink);
+ qd_link_free(link);
//
// If we lost the link to a neighbor router, notify the route engine so it
doesn't
@@ -1487,9 +1488,35 @@ void qd_router_setup_late(qd_dispatch_t
void qd_router_free(qd_router_t *router)
{
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
+
+ for (qd_address_t *addr = DEQ_HEAD(router->addrs); addr; addr =
DEQ_HEAD(router->addrs)) {
+ for (qd_router_link_ref_t *rlink = DEQ_HEAD(addr->rlinks); rlink;
rlink = DEQ_HEAD(addr->rlinks)) {
+ DEQ_REMOVE_HEAD(addr->rlinks);
+ free_qd_router_link_ref_t(rlink);
+ }
+
+ for (qd_router_ref_t *rnode = DEQ_HEAD(addr->rnodes); rnode; rnode =
DEQ_HEAD(addr->rnodes)) {
+ DEQ_REMOVE_HEAD(addr->rnodes);
+ free_qd_router_ref_t(rnode);
+ }
+
+ qd_hash_handle_free(addr->hash_handle);
+
+ DEQ_REMOVE_HEAD(router->addrs);
+ free_qd_address_t(addr);
+ }
+
+ qd_timer_free(router->timer);
sys_mutex_free(router->lock);
+ qd_bitmask_free(router->neighbor_free_mask);
+ free(router->out_links_by_mask_bit);
+ free(router->routers_by_mask_bit);
+ qd_hash_free(router->addr_hash);
+ qd_router_configure_free(router);
free(router);
qd_python_stop();
+ free(node_id);
+ free(direct_prefix);
}
Modified: qpid/dispatch/trunk/src/router_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Wed May 21 20:25:40 2014
@@ -36,6 +36,7 @@ void qd_router_python_setup(qd_router_t
void qd_pyrouter_tick(qd_router_t *router);
void qd_router_agent_setup(qd_router_t *router);
void qd_router_configure(qd_router_t *router);
+void qd_router_configure_free(qd_router_t *router);
typedef enum {
QD_ROUTER_MODE_STANDALONE, ///< Standalone router. No routing protocol
participation
@@ -183,10 +184,10 @@ DEQ_DECLARE(qd_config_address_t, qd_conf
*/
struct qd_waypoint_t {
DEQ_LINKS(qd_waypoint_t);
- const char *name;
+ char *name;
char in_phase; ///< Phase for re-entering message.
char out_phase; ///< Phase for exiting message.
- const char *connector_name; ///< On-demand connector name for
outgoing messages.
+ char *connector_name; ///< On-demand connector name for
outgoing messages.
qd_config_connector_t *connector; ///< Connector for outgoing
messages.
qd_connection_t *connection; ///< Connection for outgoing
messages.
qd_link_t *in_link; ///< Link for re-entering messages.
Modified: qpid/dispatch/trunk/src/server.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Wed May 21 20:25:40 2014
@@ -547,8 +547,10 @@ static void thread_join(qd_thread_t *thr
if (!thread)
return;
- if (thread->using_thread)
+ if (thread->using_thread) {
sys_thread_join(thread->thread);
+ sys_thread_free(thread->thread);
+ }
}
@@ -688,6 +690,7 @@ void qd_server_free(qd_server_t *qd_serv
pn_driver_free(qd_server->driver);
sys_mutex_free(qd_server->lock);
sys_cond_free(qd_server->cond);
+ free(qd_server->threads);
free(qd_server);
}
@@ -748,6 +751,9 @@ void qd_server_run(qd_dispatch_t *qd)
for (i = 1; i < qd_server->thread_count; i++)
thread_join(qd_server->threads[i]);
+ for (i = 0; i < qd_server->thread_count; i++)
+ qd_server->threads[i]->canceled = 0;
+
qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
}
Modified: qpid/dispatch/trunk/tests/alloc_test.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/alloc_test.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/alloc_test.c (original)
+++ qpid/dispatch/trunk/tests/alloc_test.c Wed May 21 20:25:40 2014
@@ -77,7 +77,6 @@ static char* test_alloc_basic(void *cont
int alloc_tests(void)
{
int result = 0;
- qd_alloc_initialize();
TEST_CASE(test_alloc_basic, 0);
Modified: qpid/dispatch/trunk/tests/run_unit_tests.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/run_unit_tests.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/run_unit_tests.c (original)
+++ qpid/dispatch/trunk/tests/run_unit_tests.c Wed May 21 20:25:40 2014
@@ -17,15 +17,16 @@
* under the License.
*/
+#include <qpid/dispatch.h>
#include <qpid/dispatch/buffer.h>
#include <stdio.h>
-int tool_tests();
-int timer_tests();
-int alloc_tests();
-int server_tests();
-int parse_tests();
-int compose_tests();
+int tool_tests(void);
+int timer_tests(void);
+int alloc_tests(void);
+int server_tests(qd_dispatch_t *qd);
+int parse_tests(void);
+int compose_tests(void);
int main(int argc, char** argv)
{
@@ -34,13 +35,19 @@ int main(int argc, char** argv)
exit(1);
}
+ qd_dispatch_t *qd = qd_dispatch(0);
+ qd_dispatch_load_config(qd, argv[1]);
+ qd_dispatch_configure_container(qd);
+ qd_dispatch_prepare(qd);
+
int result = 0;
result += tool_tests();
- result += timer_tests();
result += alloc_tests();
- result += server_tests(argv[1]);
- result += parse_tests(0);
- result += compose_tests(0);
+ result += server_tests(qd);
+ result += parse_tests();
+ result += compose_tests();
+ qd_dispatch_free(qd);
+ result += timer_tests();
return result;
}
Modified: qpid/dispatch/trunk/tests/server_test.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/server_test.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/server_test.c (original)
+++ qpid/dispatch/trunk/tests/server_test.c Wed May 21 20:25:40 2014
@@ -29,13 +29,10 @@
#define THREAD_COUNT 4
#define OCTET_COUNT 100
-static const char *config_file;
static qd_dispatch_t *qd;
static sys_mutex_t *test_lock;
-static void *expected_context;
static int call_count;
-static int threads_seen[THREAD_COUNT];
static char stored_error[512];
static int write_count;
@@ -45,26 +42,6 @@ static qd_user_fd_t *ufd_write;
static qd_user_fd_t *ufd_read;
-static void thread_start_handler(void *context, int thread_id)
-{
- sys_mutex_lock(test_lock);
- if (context != expected_context && !stored_error[0])
- sprintf(stored_error, "Unexpected Context Value: %lx", (long) context);
- if (thread_id >= THREAD_COUNT && !stored_error[0])
- sprintf(stored_error, "Thread_ID too large: %d", thread_id);
- if (thread_id < 0 && !stored_error[0])
- sprintf(stored_error, "Thread_ID negative: %d", thread_id);
-
- call_count++;
- if (thread_id >= 0 && thread_id < THREAD_COUNT)
- threads_seen[thread_id]++;
-
- if (call_count == THREAD_COUNT)
- qd_server_stop(qd);
- sys_mutex_unlock(test_lock);
-}
-
-
static void ufd_handler(void *context, qd_user_fd_t *ufd)
{
long dir = (long) context;
@@ -107,68 +84,21 @@ static void ufd_handler(void *context, q
}
-static void fd_test_start(void *context)
+static void fd_test_start(void *context, int unused)
{
- qd_user_fd_activate_read(ufd_read);
-}
-
-
-static char* test_start_handler(void *context)
-{
- int i;
-
- qd = qd_dispatch(0);
- qd_dispatch_load_config(qd, config_file);
- qd_dispatch_configure_container(qd);
- qd_dispatch_prepare(qd);
-
- expected_context = (void*) 0x00112233;
- stored_error[0] = 0x0;
- call_count = 0;
- for (i = 0; i < THREAD_COUNT; i++)
- threads_seen[i] = 0;
-
- qd_server_set_start_handler(qd, thread_start_handler, expected_context);
- qd_server_run(qd);
- qd_dispatch_free(qd);
-
- if (stored_error[0]) return stored_error;
- if (call_count != THREAD_COUNT) return "Incorrect number of thread-start
callbacks";
- for (i = 0; i < THREAD_COUNT; i++)
- if (threads_seen[i] != 1) return "Incorrect count on one thread ID";
-
- return 0;
-}
-
-
-static char *test_server_start(void *context)
-{
- qd = qd_dispatch(0);
- qd_dispatch_load_config(qd, config_file);
- qd_dispatch_configure_container(qd);
- qd_dispatch_prepare(qd);
-
- qd_server_start(qd);
- qd_server_stop(qd);
- qd_dispatch_free(qd);
-
- return 0;
+ if (++call_count == THREAD_COUNT) {
+ qd_user_fd_activate_read(ufd_read);
+ }
}
static char* test_user_fd(void *context)
{
int res;
- qd_timer_t *timer;
-
- qd = qd_dispatch(0);
- qd_dispatch_load_config(qd, config_file);
- qd_dispatch_configure_container(qd);
- qd_dispatch_prepare(qd);
+ call_count = 0;
+ qd_server_set_start_handler(qd, fd_test_start, 0);
qd_server_set_user_fd_handler(qd, ufd_handler);
- timer = qd_timer(qd, fd_test_start, 0);
- qd_timer_schedule(timer, 0);
stored_error[0] = 0x0;
@@ -188,8 +118,6 @@ static char* test_user_fd(void *context)
ufd_read = qd_user_fd(qd, fd[0], (void*) 0);
qd_server_run(qd);
- qd_timer_free(timer);
- qd_dispatch_free(qd);
close(fd[0]);
close(fd[1]);
@@ -202,15 +130,13 @@ static char* test_user_fd(void *context)
}
-int server_tests(const char *_config_file)
+int server_tests(qd_dispatch_t *_qd)
{
int result = 0;
test_lock = sys_mutex();
- config_file = _config_file;
+ qd = _qd;
- TEST_CASE(test_server_start, 0);
- TEST_CASE(test_start_handler, 0);
TEST_CASE(test_user_fd, 0);
sys_mutex_free(test_lock);
Modified: qpid/dispatch/trunk/tests/timer_test.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/timer_test.c?rev=1596672&r1=1596671&r2=1596672&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/timer_test.c (original)
+++ qpid/dispatch/trunk/tests/timer_test.c Wed May 21 20:25:40 2014
@@ -408,6 +408,7 @@ int timer_tests(void)
qd_timer_free(timers[i]);
qd_timer_finalize();
+ sys_mutex_free(lock);
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]