DISPATCH-390: Convert dispatch to use pn_proactor_t - remove driver.h/c, server.c uses proactor API directly - update stop/start, signal handling - refactor server connector queue processing as PN event handlers - qd_timer using pn_proactor_timeout() - deferred calls use pn_proactor_wake() - drop qd_thread_t struct, use sys_thread_t directly - document new listen "host" semantics in schema - updated logging, NOTICE for key life-cycle events - merge qd_listener_t+qd_config_listener, qd_connector_t+qd_config_connector_t - remove dead code: work_ queue, a_thread_is_waiting, owner_thread
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6f56e289 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6f56e289 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6f56e289 Branch: refs/heads/master Commit: 6f56e289bec0db4a1de257883dc456a502c42fe7 Parents: 0539dc4 Author: Alan Conway <[email protected]> Authored: Fri Jan 20 14:20:31 2017 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Apr 27 13:29:40 2017 -0400 ---------------------------------------------------------------------- config.sh | 41 +- include/qpid/dispatch/amqp.h | 2 - include/qpid/dispatch/connection_manager.h | 21 - include/qpid/dispatch/driver.h | 441 ------ include/qpid/dispatch/error.h | 7 + include/qpid/dispatch/router_core.h | 11 +- include/qpid/dispatch/server.h | 240 +-- include/qpid/dispatch/timer.h | 10 +- python/qpid_dispatch/management/qdrouter.json | 4 +- python/qpid_dispatch_internal/router/address.py | 5 - router/CMakeLists.txt | 2 +- router/src/main.c | 29 +- src/CMakeLists.txt | 3 +- src/amqp.c | 2 - src/connection_manager.c | 192 ++- src/container.c | 110 +- src/error.c | 15 +- src/http-none.c | 1 - src/policy.c | 97 +- src/policy.h | 9 +- src/posix/driver.c | 1093 -------------- src/router_core/connections.c | 14 +- src/router_core/router_core_private.h | 3 - src/router_core/router_core_thread.c | 18 - src/router_node.c | 16 +- src/server.c | 1398 ++++++------------ src/server_private.h | 74 +- src/timer.c | 183 +-- src/timer_private.h | 14 +- tests/CMakeLists.txt | 2 +- tests/run_unit_tests.c | 4 +- tests/system_tests_management.py | 2 +- tests/system_tests_policy.py | 59 +- tests/timer_test.c | 305 ++-- 34 files changed, 961 insertions(+), 3466 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/config.sh ---------------------------------------------------------------------- diff --git a/config.sh b/config.sh index 7e5e97f..beb7ed2 100644 --- a/config.sh +++ b/config.sh @@ -1,33 +1,8 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [[ ! -f config.sh ]]; then - echo "You must source config.sh from within its own directory" - return -fi - -export SOURCE_DIR=$(pwd) -export BUILD_DIR=$SOURCE_DIR/${1:-build} -export INSTALL_DIR=$SOURCE_DIR/${2:-install} - -PYTHON_LIB=$(python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(prefix='$INSTALL_DIR')") - -export LD_LIBRARY_PATH=$INSTALL_DIR/lib64:$INSTALL_DIR/lib:$LD_LIBRARY_PATH -export PYTHONPATH=$PYTHON_LIB:$PYTHONPATH -export PATH=$INSTALL_DIR/sbin:$INSTALL_DIR/bin:$SOURCE_DIR/bin:$PATH +PYTHONPATH=/home/aconway/dispatch/python:/home/aconway/dispatch/tests:/home/aconway/dispatch:/usr/local/lib/proton/bindings/python:/usr/local/lib64/proton/bindings/python:/usr/local/lib/python2.7/site-packages:/usr/local/lib64/python2.7/site-packages:/usr/lib/python27.zip:/usr/lib64/python2.7:/usr/lib64/python2.7/plat-linux2:/usr/lib64/python2.7/lib-tk:/usr/lib64/python2.7/lib-old:/usr/lib64/python2.7/lib-dynload:/usr/lib64/python2.7/site-packages:/usr/lib/python2.7/site-packages +BUILD_DIR=/home/aconway/dispatch +QPID_DISPATCH_HOME=/home/aconway/dispatch +QPID_DISPATCH_LIB=/home/aconway/dispatch/src/ +MANPATH=/home/aconway/dispatch/doc/man:/usr/local/share/man:/usr/share/man +PATH=/home/aconway/dispatch:/home/aconway/dispatch/tests:/home/aconway/dispatch/router:/home/aconway/dispatch/tools:/home/aconway/dispatch/bin:/home/aconway/bin:/home/aconway/ha/bin:/usr/local/bin:/usr/local/sbin:/usr/lib64/qt-3.3/bin:/usr/lib64/ccache:/usr/bin:/usr/sbin:/home/aconway/go/bin:/home/aconway/proton/proton-c/bindings/go/bin +SOURCE_DIR=/home/aconway/dispatch +export PYTHONPATH BUILD_DIR QPID_DISPATCH_HOME QPID_DISPATCH_LIB MANPATH PATH SOURCE_DIR http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/amqp.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 60ad973..e5c45c6 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -165,6 +165,4 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE; extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL; /// @}; -/// Name for AMQP conditions from the router that don't have a more specific name. -extern const char * const QD_COND; #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/connection_manager.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/connection_manager.h b/include/qpid/dispatch/connection_manager.h index 12ac35e..4392966 100644 --- a/include/qpid/dispatch/connection_manager.h +++ b/include/qpid/dispatch/connection_manager.h @@ -27,8 +27,6 @@ #include <qpid/dispatch/server.h> typedef struct qd_connection_manager_t qd_connection_manager_t; -typedef struct qd_config_connector_t qd_config_connector_t; -typedef struct qd_config_listener_t qd_config_listener_t; typedef struct qd_config_ssl_profile_t qd_config_ssl_profile_t; typedef void (*qd_connection_manager_handler_t) (void *context, qd_connection_t *conn); @@ -49,16 +47,6 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd); void qd_connection_manager_free(qd_connection_manager_t *cm); /** - * Free all the resources associated with a config listener - */ -void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *cl); - -/** - * Free all the resources associated with a config connector - */ -void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t *cl); - -/** * Start the configured Listeners and Connectors * * Note that on-demand connectors are not started by this function. @@ -67,13 +55,4 @@ void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t */ void qd_connection_manager_start(qd_dispatch_t *qd); - -/** - * Get the connector's name. - * - * @param cc Connector handle - * @return The name of the connector - */ -const char *qd_config_connector_name(qd_config_connector_t *cc); - #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/driver.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h deleted file mode 100644 index 6c24a23..0000000 --- a/include/qpid/dispatch/driver.h +++ /dev/null @@ -1,441 +0,0 @@ -#ifndef __dispatch_posix_driver_h__ -#define __dispatch_posix_driver_h__ 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/dispatch/log.h> - -#include <proton/error.h> -#include <proton/sasl.h> -#include <proton/selectable.h> -#include <proton/ssl.h> -#include <proton/transport.h> -#include <proton/types.h> - -/** @file - * API for the Driver Layer. - * - * The driver library provides a simple implementation of a driver for - * the proton engine. A driver is responsible for providing input, - * output, and tick events to the bottom half of the engine API. See - * pn_transport_input, pn_transport_output, and - * pn_transport_tick. The driver also provides an interface for the - * application to access the top half of the API when the state of the - * engine may have changed due to I/O or timing events. Additionally - * the driver incorporates the SASL engine as well in order to provide - * a complete network stack: AMQP over SASL over TCP. - * - */ - -typedef struct qdpn_driver_t qdpn_driver_t; -typedef struct qdpn_listener_t qdpn_listener_t; -typedef struct qdpn_connector_t qdpn_connector_t; - -typedef enum { - QDPN_CONNECTOR_WRITABLE, - QDPN_CONNECTOR_READABLE -} qdpn_activate_criteria_t; - -/** Construct a driver - * - * Call qdpn_driver_free() to release the driver object. - * @param log source to use for log messages, the driver does not have it's own. - * @return new driver object, NULL if error - */ -qdpn_driver_t *qdpn_driver(qd_log_source_t* log); - -/** Return the most recent error code. - * - * @param[in] d the driver - * - * @return the most recent error text for d - */ -int qdpn_driver_errno(qdpn_driver_t *d); - -/** Get additional error information associated with the driver. - * - * Whenever a driver operation fails, additional error information can - * be obtained using this function. The error object that is returned - * may also be used to clear the error condition. - * - * The pointer returned by this operation is valid until the - * driver object is freed. - * - * @param[in] d the driver - * - * @return the driver's error object - */ -pn_error_t *qdpn_driver_error(qdpn_driver_t *d); - -/** Force qdpn_driver_wait() to return - * - * @param[in] driver the driver to wake up - * - * @return zero on success, an error code on failure - */ -int qdpn_driver_wakeup(qdpn_driver_t *driver); - -/** Wait for an active connector or listener - * - * @param[in] driver the driver to wait on - * @param[in] timeout maximum time in milliseconds to wait, -1 means - * infinite wait - * - * @return zero on success, an error code on failure - */ -int qdpn_driver_wait(qdpn_driver_t *driver, int timeout); - -/** Get the next listener with pending data in the driver. - * - * @param[in] driver the driver - * @return NULL if no active listener available - */ -qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *driver); - -/** Get the next active connector in the driver. - * - * Returns the next connector with pending inbound data, available - * capacity for outbound data, or pending tick. - * - * @param[in] driver the driver - * @return NULL if no active connector available - */ -qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *driver); - -/** Free the driver allocated via qdpn_driver, and all associated - * listeners and connectors. - * - * @param[in] driver the driver to free, no longer valid on - * return - */ -void qdpn_driver_free(qdpn_driver_t *driver); - - -/** qdpn_listener - the server API **/ - -/** Construct a listener for the given address. - * - * @param[in] driver driver that will 'own' this listener - * @param[in] host local host address to listen on - * @param[in] port local port to listen on - * @param[in] protocol family to use (IPv4 or IPv6 or 0). If 0 (zero) is passed in the protocol family will be automatically determined from the address - * @param[in] context application-supplied, can be accessed via - * qdpn_listener_context() - * @param[in] methods to apply to new connectors. - * @return a new listener on the given host:port, NULL if error - */ -qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, - const char *host, - const char *port, - const char *protocol_family, - void* context - ); - -/** Access the head listener for a driver. - * - * @param[in] driver the driver whose head listener will be returned - * - * @return the head listener for driver or NULL if there is none - */ -qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver); - -/** Access the next listener. - * - * @param[in] listener the listener whose next listener will be - * returned - * - * @return the next listener - */ -qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener); - -/** Accept a connection that is pending on the listener. - * - * @param[in] listener the listener to accept the connection on - * @param[in] policy policy that holds absolute connection limits - * @param[in] policy_fn function that accepts remote host name and returns - * decision to allow or deny this connection - * @param[out] counted pointer to a bool set to true when the connection was - * counted against absolute connection limits - * @return a new connector for the remote, or NULL on error - */ -qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener, - void *policy, - bool (*policy_fn)(void *, const char *), - bool *counted); - -/** Access the application context that is associated with the listener. - * - * @param[in] listener the listener whose context is to be returned - * @return the application context that was passed to qdpn_listener() or - * qdpn_listener_fd() - */ -void *qdpn_listener_context(qdpn_listener_t *listener); - -void qdpn_listener_set_context(qdpn_listener_t *listener, void *context); - -/** Close the socket used by the listener. - * - * @param[in] listener the listener whose socket will be closed. - */ -void qdpn_listener_close(qdpn_listener_t *listener); - -/** Frees the given listener. - * - * Assumes the listener's socket has been closed prior to call. - * - * @param[in] listener the listener object to free, no longer valid - * on return - */ -void qdpn_listener_free(qdpn_listener_t *listener); - - - - -/** qdpn_connector - the client API **/ - -/** Construct a connector to the given remote address. - * - * @param[in] driver owner of this connection. - * @param[in] host remote host to connect to. - * @param[in] port remote port to connect to. - * @param[in] protocol family to use (IPv4 or IPv6 or 0). If 0 (zero) is passed in the protocol family will be automatically determined from the address - * @param[in] context application supplied, can be accessed via - * qdpn_connector_context() @return a new connector - * to the given remote, or NULL on error. - */ -qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, - const char *host, - const char *port, - const char *protocol_family, - void* context); - -/** Access the head connector for a driver. - * - * @param[in] driver the driver whose head connector will be returned - * - * @return the head connector for driver or NULL if there is none - */ -qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver); - -/** Access the next connector. - * - * @param[in] connector the connector whose next connector will be - * returned - * - * @return the next connector - */ -qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector); - -/** Service the given connector. - * - * Handle any inbound data, outbound data, or timing events pending on - * the connector. - * - * @param[in] connector the connector to process. - */ -void qdpn_connector_process(qdpn_connector_t *connector); - -/** Access the listener which opened this connector. - * - * @param[in] connector connector whose listener will be returned. - * @return the listener which created this connector, or NULL if the - * connector has no listener (e.g. an outbound client - * connection) - */ -qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *connector); - -/** Access the Authentication and Security context of the connector. - * - * @param[in] connector connector whose security context will be - * returned - * @return the Authentication and Security context for the connector, - * or NULL if none - */ -pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *connector); - -/** Access the AMQP Connection associated with the connector. - * - * @param[in] connector the connector whose connection will be - * returned - * @return the connection context for the connector, or NULL if none - */ -pn_connection_t *qdpn_connector_connection(qdpn_connector_t *connector); - -/** Assign the AMQP Connection associated with the connector. - * - * @param[in] connector the connector whose connection will be set. - * @param[in] connection the connection to associate with the - * connector - */ -void qdpn_connector_set_connection(qdpn_connector_t *connector, pn_connection_t *connection); - -/** Access the application context that is associated with the - * connector. - * - * @param[in] connector the connector whose context is to be returned. - * @return the application context that was passed to qdpn_connector() - * or qdpn_connector_fd() - */ -void *qdpn_connector_context(qdpn_connector_t *connector); - -/** Assign a new application context to the connector. - * - * @param[in] connector the connector which will hold the context. - * @param[in] context new application context to associate with the - * connector - */ -void qdpn_connector_set_context(qdpn_connector_t *connector, void *context); - -/** Access the name of the connector - * - * @param[in] connector the connector of interest - * @return the name of the connector in the form of a null-terminated character string. - */ -const char *qdpn_connector_name(const qdpn_connector_t *connector); - -/** Access the numeric host ip of the connector - * - * @param[in] connector the connector of interest - * @return the numeric host ip address of the connector in the form of a null-terminated character string. - */ -const char *qdpn_connector_hostip(const qdpn_connector_t *connector); - -/** Access the transport used by this connector. - * - * @param[in] connector connector whose transport will be returned - * @return the transport, or NULL if none - */ -pn_transport_t *qdpn_connector_transport(qdpn_connector_t *connector); - -/** Close the socket used by the connector. - * - * @param[in] connector the connector whose socket will be closed - */ -void qdpn_connector_close(qdpn_connector_t *connector); - -/** Call when the socket is already closed, an the connector needs updating. - * - * @param[in] connector the connector whose socket has been closed - */ -void qdpn_connector_after_close(qdpn_connector_t *connector); - - -/** Socket has been closed externally, mark it closed. - * - * @param[in] connector the connector whose socket will be closed - */ -void qdpn_connector_mark_closed(qdpn_connector_t *connector); - -/** Determine if the connector is closed. - * - * @return True if closed, otherwise false - */ -bool qdpn_connector_closed(qdpn_connector_t *connector); - -bool qdpn_connector_failed(qdpn_connector_t *connector); - - -/** Destructor for the given connector. - * - * Assumes the connector's socket has been closed prior to call. - * - * @param[in] connector the connector object to free. No longer - * valid on return - */ -void qdpn_connector_free(qdpn_connector_t *connector); - -/** Activate a connector when a criteria is met - * - * Set a criteria for a connector (i.e. it's transport is writable) that, once met, - * the connector shall be placed in the driver's work queue. - * - * @param[in] connector The connector object to activate - * @param[in] criteria The criteria that must be met prior to activating the connector - */ -void qdpn_connector_activate(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria); - -/** Activate all of the open file descriptors - */ -void qdpn_activate_all(qdpn_driver_t *driver); - -/** Return the activation status of the connector for a criteria - * - * Return the activation status (i.e. readable, writable) for the connector. This function - * has the side-effect of canceling the activation of the criteria. - * - * Please note that this function must not be used for normal AMQP connectors. It is only - * used for connectors created so the driver can track non-AMQP file descriptors. Such - * connectors are never passed into qdpn_connector_process. - * - * @param[in] connector The connector object to activate - * @param[in] criteria The criteria to test. "Is this the reason the connector appeared - * in the work list?" - * @return true iff the criteria is activated on the connector. - */ -bool qdpn_connector_activated(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria); - -/** True if the connector has received a hangup */ -bool qdpn_connector_hangup(qdpn_connector_t *connector); - -/** Create a listener using the existing file descriptor. - * - * @param[in] driver driver that will 'own' this listener - * @param[in] fd existing socket for listener to listen on - * @param[in] context application-supplied, can be accessed via - * qdpn_listener_context() - * @return a new listener on the given host:port, NULL if error - */ -qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context); - -pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener); - -/** Create a connector using the existing file descriptor. - * - * @param[in] driver driver that will 'own' this connector. - * @param[in] fd existing socket to use for this connector. - * @param[in] context application-supplied, can be accessed via - * qdpn_connector_context() - * @return a new connector to the given host:port, NULL if error. - */ -qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context); - -/** Get the file descriptor for this connector */ -int qdpn_connector_get_fd(qdpn_connector_t *connector); - -/** Set the wakeup time on the connector */ -void qdpn_connector_wakeup(qdpn_connector_t* c, pn_timestamp_t t); - -/** Current time according */ -pn_timestamp_t qdpn_now(); - -/** Implementation of connector methods (e.g. these are different for HTTP connectors */ -typedef struct qdpn_connector_methods_t { - void (*process)(qdpn_connector_t *c); - void (*close)(qdpn_connector_t *c); -} qdpn_connector_methods_t; - -/** Set new methods for a connector (e.g. because it is a HTTP connector) */ -void qdpn_connector_set_methods(qdpn_connector_t *c, qdpn_connector_methods_t *methods); - -/**@}*/ - -#endif /* driver.h */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/error.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/error.h b/include/qpid/dispatch/error.h index 6bf7e82..f464e58 100644 --- a/include/qpid/dispatch/error.h +++ b/include/qpid/dispatch/error.h @@ -20,6 +20,7 @@ */ #include <qpid/dispatch/enum.h> +#include <stdarg.h> /** @file * Thread-safe error handling mechansim for dispatch. @@ -59,7 +60,13 @@ ENUM_DECLARE(qd_error); */ #define qd_error(code, ...) qd_error_impl(code, __FILE__, __LINE__, __VA_ARGS__) +/** + * Like qd_error but takes a va_list of format arguments + */ +#define qd_verror(code, fmt, ap) qd_error_vimpl(code, __FILE__, __LINE__, fmt, ap) + qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...); +qd_error_t qd_error_vimpl(qd_error_t code, const char *file, int line, const char *fmt, va_list ap); /** * Clear thread-local error code and message. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 2f749b7..0031ed7 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -220,18 +220,17 @@ int qdr_connection_process(qdr_connection_t *conn); /** * qdr_connection_activate_t callback * - * Activate a connection for transmission (socket write). This is called whenever - * the core has deliveries on links, disposition updates on deliveries, or flow updates - * to be sent across the connection. + * Activate a connection with pending work from the core to ensure it will be processed by + * the proactor: the core has deliveries on links, disposition updates on deliveries, or + * flow updates to be sent across the connection. * * IMPORTANT: This function will be invoked on the core thread. It must never block, * delay, or do any lenghty computation. * * @param context The context supplied when the callback was registered * @param conn The connection object to be activated - * @param awaken Iff true, awaken the driver poll loop after the activation */ -typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn, bool awaken); +typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn); /** ****************************************************************************** @@ -560,7 +559,7 @@ typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int l typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled); typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled); -void qdr_connection_handlers(qdr_core_t *core, +void qdr_connection_handlers(qdr_core_t *core, void *context, qdr_connection_activate_t activate, qdr_link_first_attach_t first_attach, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index a466ec0..ec885ae 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -24,8 +24,10 @@ #include <proton/engine.h> #include <proton/event.h> +struct qd_container_t; + /**@file - * Control server threads, signals and connections. + * Control server threads and connections. */ /** @@ -52,9 +54,8 @@ typedef void (*qd_deferred_t)(void *context, bool discard); * Run the server threads until completion - The blocking version. * * Start the operation of the server, including launching all of the worker - * threads. This function does not return until after the server has been - * stopped. The thread that calls qd_server_run is used as one of the worker - * threads. + * threads. Returns when all server threads have exited. The thread that calls + * qd_server_run is used as one of the worker threads. * * @param qd The dispatch handle returned by qd_dispatch. */ @@ -62,103 +63,16 @@ void qd_server_run(qd_dispatch_t *qd); /** - * Start the server threads and return immediately - The non-blocking version. + * Tells the server to stop but doesn't wait for server to exit. + * The call to qd_server_run() will exit when all server threads have exited. * - * Start the operation of the server, including launching all of the worker - * threads. + * May be called from any thread or from a signal handler. * * @param qd The dispatch handle returned by qd_dispatch. */ -void qd_server_start(qd_dispatch_t *qd); - -/** - * Stop the server - * - * Stop the server and join all of its worker threads. This function may be - * called from any thread. When this function returns, all of the other - * server threads have been closed and joined. The calling thread will be the - * only running thread in the process. - * - * @param qd The dispatch handle returned by qd_dispatch. - */ void qd_server_stop(qd_dispatch_t *qd); - -/** - * Pause (quiesce) the server. - * - * This call blocks until all of the worker threads (except the one calling - * this function) are finished processing and have been blocked. When this - * call returns, the calling thread is the only thread running in the process. - * - * If the calling process is *not* one of the server's worker threads, then - * this function will block all of the worker threads before returning. - * - * @param qd The dispatch handle returned by qd_dispatch. - */ -void qd_server_pause(qd_dispatch_t *qd); - - -/** - * Resume normal operation of a paused server. - * - * This call unblocks all of the worker threads so they can resume normal - * connection processing. - * - * @param qd The dispatch handle returned by qd_dispatch. - */ -void qd_server_resume(qd_dispatch_t *qd); - - -/** - * @} - * @defgroup server_signal server_signal - * - * Server Signal Handling - * - * @{ - */ - - -/** - * Signal Handler - * - * Callback for signal handling. This handler will be invoked on one of the - * worker threads in an orderly fashion. This callback is triggered by a call - * to qd_server_signal. - * - * @param context The handler context supplied in qd_server_initialize. - * @param signum The signal number that was passed into qd_server_signal. - */ -typedef void (*qd_signal_handler_cb_t)(void* context, int signum); - - -/** - * Set the signal handler for the server. The signal handler is invoked - * cleanly on a worker thread after a call is made to qd_server_signal. The - * signal handler is optional and need not be set. - * - * @param qd The dispatch handle returned by qd_dispatch. - * @param signal_handler The signal handler called when a registered signal is caught. - * @param context Opaque context to be passed back in the callback function. - */ -void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t signal_handler, void *context); - - -/** - * Schedule the invocation of the Server's signal handler. - * - * This function is safe to call from any context, including an OS signal - * handler or an Interrupt Service Routine. It schedules the orderly - * invocation of the Server's signal handler on one of the worker threads. - * - * @param qd The dispatch handle returned by qd_dispatch. - * @param signum The signal number... TODO - */ -void qd_server_signal(qd_dispatch_t *qd, int signum); - - /** * @} * @defgroup connection connection @@ -459,76 +373,15 @@ typedef struct qd_server_config_t { char *host_port; /** - * Set for listeners that are part of the initial router configuration. - * An error in setting up initial listeners must shut down the router. - */ - bool exit_on_error; - - /** * @} */ } qd_server_config_t; /** - * Connection Event Handler - * - * Callback invoked when processing is needed on a proton connection. This - * callback shall be invoked on one of the server's worker threads. The - * server guarantees that no two threads shall be allowed to process a single - * connection concurrently. The implementation of this handler may assume - * that it has exclusive access to the connection and its subservient - * components (sessions, links, deliveries, etc.). - * - * @param handler_context The handler context supplied in qd_server_set_conn_handler. - * @param conn_context The handler context supplied in qd_server_{connect,listen}. - * @param event The event/reason for the invocation of the handler. - * @param conn The connection that requires processing by the handler. - * @return A value greater than zero if the handler did any proton processing for - * the connection. If no work was done, zero is returned. - */ -typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, qd_conn_event_t event, qd_connection_t *conn); - -/** - * Proton Event Handler - * - * This callback is invoked when proton events for a connection require - * processing. - * - * @param handler_context The handler context supplied in qd_server_set_conn_handler. - * @param conn_context The handler context supplied in qd_server_{connect,listen}. - * @param event The proton event being raised. - * @param conn The connection associated with this proton event. + * Set the container, must be set prior to the invocation of qd_server_run. */ -typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn); - - -/** - * Post event process handler - * Invoke only after all proton events have been popped from the collector. - * - * @param conn The connection for which all proton events have been popped. - */ -typedef void (*qd_pn_event_complete_cb_t)(void *handler_context, qd_connection_t *conn); - - -/** - * Set the connection event handler callback. - * - * Set the connection handler callback for the server. This callback is - * mandatory and must be set prior to the invocation of qd_server_run. - * - * @param qd The dispatch handle returned by qd_dispatch. - * @param conn_handler The handler for processing connection-related events. - * @param pn_event_handler The handler for proton events. - * @param handler_context Context data to associate with the handler. - */ -void qd_server_set_conn_handler(qd_dispatch_t *qd, - qd_conn_handler_cb_t conn_handler, - qd_pn_event_handler_cb_t pn_event_handler, - qd_pn_event_complete_cb_t pn_event_complete_handler, - void *handler_context); - +void qd_server_set_container(qd_dispatch_t *qd, struct qd_container_t *container); /** * Set the user context for a connection. @@ -596,9 +449,8 @@ void qd_connection_set_user(qd_connection_t *conn); * internal work list and be invoked for processing by a worker thread. * * @param conn The connection over which the application wishes to send data - * @param awaken Iff true, wakeup the driver poll after the activation */ -void qd_server_activate(qd_connection_t *conn, bool awaken); +void qd_server_activate(qd_connection_t *conn); /** @@ -620,15 +472,6 @@ bool qd_connection_inbound(qd_connection_t *conn); /** - * Get the event collector for a connection. - * - * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN - * @return The pn_collector associated with the connection. - */ -pn_collector_t *qd_connection_collector(qd_connection_t *conn); - - -/** * Get the connection id of a connection. * * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN @@ -658,67 +501,14 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo /** - * Write accessor to the connection's proton-event stall flag. - * When set no further events are processed on this connection. - * Used during processing of policy decisions to hold off incoming - * pipeline of amqp events. - * - * @param conn Connection object - * @param stall Value of stall flag - */ -void qd_connection_set_event_stall(qd_connection_t *conn, bool stall); - - -/** - * Create a listener for incoming connections. - * - * @param qd The dispatch handle returned by qd_dispatch. - * @param config Pointer to a configuration block for this listener. This block will be - * referenced by the server, not copied. The referenced record must remain - * in-scope for the life of the listener. - * @param context User context passed back in the connection handler. - * @return A pointer to the new listener, or NULL in case of failure. - */ -qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context); - - -/** - * Free the resources associated with a listener. - * - * @param li A listener pointer returned by qd_listen. + * Listen for incoming connections, return true if listening succeeded. */ -void qd_server_listener_free(qd_listener_t* li); - +bool qd_listener_listen(qd_listener_t *l); /** - * Close a listener so it will accept no more connections. - * - * @param li A listener pointer returned by qd_listen. + * Initiate an outgoing connection. Returns true if successful. */ -void qd_server_listener_close(qd_listener_t* li); - - -/** - * Create a connector for an outgoing connection. - * - * @param qd The dispatch handle returned by qd_dispatch. - * @param config Pointer to a configuration block for this connector. This block will be - * referenced by the server, not copied. The referenced record must remain - * in-scope for the life of the connector.. - * @param context User context passed back in the connection handler. - * @return A pointer to the new connector, or NULL in case of failure. - */ -qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context); - - -/** - * Free the resources associated with a connector. - * - * @param ct A connector pointer returned by qd_connect. - */ -void qd_server_connector_free(qd_connector_t* ct); - - +bool qd_connector_connect(qd_connector_t *ct); /** * Store address of display name service py object for C code use http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/timer.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/timer.h b/include/qpid/dispatch/timer.h index e5ba6ab..d0592d3 100644 --- a/include/qpid/dispatch/timer.h +++ b/include/qpid/dispatch/timer.h @@ -33,7 +33,10 @@ */ typedef struct qd_timer_t qd_timer_t; +/** Absolute time stamp, milliseconds since epoch */ typedef int64_t qd_timestamp_t; +/** Relative duration in milliseconds */ +typedef int64_t qd_duration_t; /** * Timer Callback @@ -77,7 +80,7 @@ void qd_timer_free(qd_timer_t *timer); * @param msec The minimum number of milliseconds of delay until the timer fires. * If 0 is supplied, the timer will be scheduled to fire immediately. */ -void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t msec); +void qd_timer_schedule(qd_timer_t *timer, qd_duration_t msec); /** @@ -90,6 +93,11 @@ void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t msec); void qd_timer_cancel(qd_timer_t *timer); /** + * The current time. + */ +qd_timestamp_t qd_timer_now() ; + +/** * @} */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 667c0ec..3f66abb 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -562,9 +562,9 @@ "operations": ["CREATE", "DELETE"], "attributes": { "host": { - "description":"IP address: ipv4 or ipv6 literal or a host name", + "description":"A host name, IPV4 or IPV6 literal, or the empty string. The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.", "type": "string", - "default": "127.0.0.1", + "default": "", "create": true }, "port": { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/python/qpid_dispatch_internal/router/address.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/address.py b/python/qpid_dispatch_internal/router/address.py index 4092ac7..0cff578 100644 --- a/python/qpid_dispatch_internal/router/address.py +++ b/python/qpid_dispatch_internal/router/address.py @@ -27,11 +27,6 @@ class Address(str): Provides a central place for logic to construct addresses of various types. """ - # FIXME aconway 2015-02-06: not finished: - # - Move to C, make accessible in C code also - provide python wrapper. - # - Provide access to parts of address using C field iterator, avoid duplicating that logic - # - (Maybe) separate address logic out of general field iterator logic for clarity. - AMQP="amqp:" TOPO="_topo" http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/router/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/router/CMakeLists.txt b/router/CMakeLists.txt index 5681e00..52e9ccc 100644 --- a/router/CMakeLists.txt +++ b/router/CMakeLists.txt @@ -35,6 +35,6 @@ set(router_SOURCES SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${QPID_DISPATCH_HOME}") add_executable(qdrouterd ${router_SOURCES}) -target_link_libraries(qdrouterd qpid-dispatch ${proton_lib}) +target_link_libraries(qdrouterd qpid-dispatch) install(TARGETS qdrouterd RUNTIME DESTINATION sbin) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/router/src/main.c ---------------------------------------------------------------------- diff --git a/router/src/main.c b/router/src/main.c index c09b0a8..ee084df 100644 --- a/router/src/main.c +++ b/router/src/main.c @@ -38,42 +38,21 @@ static const char* argv0 = 0; /** * This is the OS signal handler, invoked on an undetermined thread at a completely - * arbitrary point of time. It is not safe to do anything here but signal the dispatch - * server with the signal number. + * arbitrary point of time. */ static void signal_handler(int signum) { - qd_server_signal(dispatch, signum); -} - - -/** - * This signal handler is called cleanly by one of the server's worker threads in - * response to an earlier call to qd_server_signal. - */ -static void server_signal_handler(void* context, int signum) -{ - qd_server_pause(dispatch); - switch (signum) { case SIGINT: exit_with_sigint = 1; // fallthrough - case SIGQUIT: case SIGTERM: - fflush(stdout); - qd_server_stop(dispatch); - break; - - case SIGHUP: + qd_server_stop(dispatch); /* qpid_server_stop is signal-safe */ break; - default: break; } - - qd_server_resume(dispatch); } static void check(int fd) { @@ -109,9 +88,6 @@ static void main_process(const char *config_path, const char *python_pkgdir, int qd_dispatch_load_config(dispatch, config_path); check(fd); - (void)server_signal_handler; (void)signal_handler; - qd_server_set_signal_handler(dispatch, server_signal_handler, 0); - signal(SIGHUP, signal_handler); signal(SIGQUIT, signal_handler); signal(SIGTERM, signal_handler); @@ -133,6 +109,7 @@ static void main_process(const char *config_path, const char *python_pkgdir, int dispatch = NULL; qd_dispatch_free(d); + fflush(stdout); if (exit_with_sigint) { signal(SIGINT, SIG_DFL); kill(getpid(), SIGINT); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4c00206..17674d4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,7 +62,6 @@ set(qpid_dispatch_SOURCES message.c parse.c policy.c - posix/driver.c posix/threading.c python_embedded.c router_agent.c @@ -112,7 +111,7 @@ if (CMAKE_C_COMPILER_ID STREQUAL "GNU") endif (CMAKE_C_COMPILER_ID STREQUAL "GNU") add_library(qpid-dispatch SHARED ${qpid_dispatch_SOURCES}) -target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES}) +target_link_libraries(qpid-dispatch ${ProtonCore_LIBRARIES} ${ProtonProactor_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES}) set_target_properties(qpid-dispatch PROPERTIES LINK_FLAGS "${CATCH_UNDEFINED}" ) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/amqp.c ---------------------------------------------------------------------- diff --git a/src/amqp.c b/src/amqp.c index 0c49a16..cf9f775 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -63,5 +63,3 @@ const char * const QD_AMQP_COND_PRECONDITION_FAILED = "amqp:precondition-failed" const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted"; const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state"; const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = "amqp:frame-size-too-small"; - -const char * const QD_COND_NAME = "router:error"; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/connection_manager.c ---------------------------------------------------------------------- diff --git a/src/connection_manager.c b/src/connection_manager.c index 8957185..4bc18ce 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -22,6 +22,7 @@ #include <qpid/dispatch/threading.h> #include <qpid/dispatch/atomic.h> #include <qpid/dispatch/failoverlist.h> +#include <proton/listener.h> #include "dispatch_private.h" #include "connection_manager_private.h" #include "server_private.h" @@ -47,29 +48,13 @@ struct qd_config_ssl_profile_t { char *ssl_private_key_file; }; -struct qd_config_listener_t { - qd_listener_t *listener; - qd_server_config_t configuration; - DEQ_LINKS(qd_config_listener_t); -}; - -DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t); DEQ_DECLARE(qd_config_ssl_profile_t, qd_config_ssl_profile_list_t); - -struct qd_config_connector_t { - DEQ_LINKS(qd_config_connector_t); - qd_connector_t *connector; - qd_server_config_t configuration; -}; - -DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t); - struct qd_connection_manager_t { qd_log_source_t *log_source; qd_server_t *server; - qd_config_listener_list_t config_listeners; - qd_config_connector_list_t config_connectors; + qd_listener_list_t listeners; + qd_connector_list_t connectors; qd_config_ssl_profile_list_t config_ssl_profiles; }; @@ -108,11 +93,12 @@ static qd_config_ssl_profile_t *qd_find_ssl_profile(qd_connection_manager_t *cm, return 0; } -static void qd_server_config_free(qd_server_config_t *cf) +void qd_server_config_free(qd_server_config_t *cf) { if (!cf) return; free(cf->host); free(cf->port); + free(cf->host_port); free(cf->role); if (cf->http_root) free(cf->http_root); if (cf->name) free(cf->name); @@ -199,6 +185,10 @@ static void set_config_host(qd_server_config_t *config, qd_entity_t* entity) } assert(config->host); + + int hplen = strlen(config->host) + strlen(config->port) + 2; + config->host_port = malloc(hplen); + snprintf(config->host_port, hplen, "%s:%s", config->host, config->port); } @@ -402,6 +392,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf return qd_error_code(); } + bool is_log_component_enabled(qd_log_bits log_message, char *component_name) { for(int i=0;;i++) { @@ -507,45 +498,32 @@ static void log_config(qd_log_source_t *log, qd_server_config_t *c, const char * } -static void config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *cl) -{ - if (cl->listener) { - qd_server_listener_close(cl->listener); - qd_server_listener_free(cl->listener); - cl->listener = 0; - } - qd_server_config_free(&cl->configuration); - free(cl); -} - - -qd_config_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity) +qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity) { qd_connection_manager_t *cm = qd->connection_manager; - qd_config_listener_t *cl = NEW(qd_config_listener_t); - cl->listener = 0; - - if (load_server_config(qd, &cl->configuration, entity) != QD_ERROR_NONE) { - qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config listener: %s", qd_error_message()); - config_listener_free(qd->connection_manager, cl); + qd_listener_t *li = qd_server_listener(qd->server); + if (!li || load_server_config(qd, &li->config, entity) != QD_ERROR_NONE) { + qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message()); + qd_listener_decref(li); return 0; } char *fol = qd_entity_opt_string(entity, "failoverList", 0); if (fol) { - cl->configuration.failover_list = qd_failover_list(fol); + li->config.failover_list = qd_failover_list(fol); free(fol); - if (cl->configuration.failover_list == 0) { - qd_log(cm->log_source, QD_LOG_ERROR, "Error parsing failover list: %s", qd_error_message()); - config_listener_free(qd->connection_manager, cl); + if (li->config.failover_list == 0) { + qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create listener, bad failover list: %s", + qd_error_message()); + qd_listener_decref(li); return 0; } } else { - cl->configuration.failover_list = 0; + li->config.failover_list = 0; } - DEQ_ITEM_INIT(cl); - DEQ_INSERT_TAIL(cm->config_listeners, cl); - log_config(cm->log_source, &cl->configuration, "Listener"); - return cl; + DEQ_ITEM_INIT(li); + DEQ_INSERT_TAIL(cm->listeners, li); + log_config(cm->log_source, &li->config, "Listener"); + return li; } @@ -561,32 +539,19 @@ qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl) } -static void config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t *cc) +qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) { - if (cc->connector) { - qd_server_connector_free(cc->connector); - qd_server_config_free(&cc->configuration); - } - free(cc); -} - - -qd_config_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) -{ - qd_error_clear(); qd_connection_manager_t *cm = qd->connection_manager; - qd_config_connector_t *cc = NEW(qd_config_connector_t); - ZERO(cc); - - if (load_server_config(qd, &cc->configuration, entity) != QD_ERROR_NONE) { - qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config connector: %s", qd_error_message()); - config_connector_free(qd->connection_manager, cc); - return 0; + qd_connector_t *ct = qd_server_connector(qd->server); + if (ct && load_server_config(qd, &ct->config, entity) == QD_ERROR_NONE) { + DEQ_ITEM_INIT(ct); + DEQ_INSERT_TAIL(cm->connectors, ct); + log_config(cm->log_source, &ct->config, "Connector"); + return ct; } - DEQ_ITEM_INIT(cc); - DEQ_INSERT_TAIL(cm->config_connectors, cc); - log_config(cm->log_source, &cc->configuration, "Connector"); - return cc; + qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + qd_connector_decref(ct); + return 0; } @@ -598,8 +563,8 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) cm->log_source = qd_log_source("CONN_MGR"); cm->server = qd->server; - DEQ_INIT(cm->config_listeners); - DEQ_INIT(cm->config_connectors); + DEQ_INIT(cm->listeners); + DEQ_INIT(cm->connectors); DEQ_INIT(cm->config_ssl_profiles); return cm; @@ -609,18 +574,18 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) void qd_connection_manager_free(qd_connection_manager_t *cm) { if (!cm) return; - qd_config_listener_t *cl = DEQ_HEAD(cm->config_listeners); - while (cl) { - DEQ_REMOVE_HEAD(cm->config_listeners); - config_listener_free(cm, cl); - cl = DEQ_HEAD(cm->config_listeners); + qd_listener_t *li = DEQ_HEAD(cm->listeners); + while (li) { + DEQ_REMOVE_HEAD(cm->listeners); + qd_listener_decref(li); + li = DEQ_HEAD(cm->listeners); } - qd_config_connector_t *cc = DEQ_HEAD(cm->config_connectors); - while (cc) { - DEQ_REMOVE_HEAD(cm->config_connectors); - config_connector_free(cm, cc); - cc = DEQ_HEAD(cm->config_connectors); + qd_connector_t *c = DEQ_HEAD(cm->connectors); + while (c) { + DEQ_REMOVE_HEAD(cm->connectors); + qd_connector_decref(c); + c = DEQ_HEAD(cm->connectors); } qd_config_ssl_profile_t *sslp = DEQ_HEAD(cm->config_ssl_profiles); @@ -637,25 +602,26 @@ void qd_connection_manager_free(qd_connection_manager_t *cm) void qd_connection_manager_start(qd_dispatch_t *qd) { static bool first_start = true; - qd_config_listener_t *cl = DEQ_HEAD(qd->connection_manager->config_listeners); - qd_config_connector_t *cc = DEQ_HEAD(qd->connection_manager->config_connectors); + qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners); + qd_connector_t *ct = DEQ_HEAD(qd->connection_manager->connectors); - while (cl) { - if (cl->listener == 0 ) { - cl->listener = qd_server_listen(qd, &cl->configuration, cl); - if (!cl->listener && first_start) { + while (li) { + if (!li->pn_listener) { + qd_listener_listen(li); + if (!li->pn_listener && first_start) { qd_log(qd->connection_manager->log_source, QD_LOG_CRITICAL, - "Socket bind failed during initial configuration"); + "Listen on %s failed during initial config", li->config.host_port); exit(1); + } else { + li->exit_on_error = first_start; } } - cl = DEQ_NEXT(cl); + li = DEQ_NEXT(li); } - while (cc) { - if (cc->connector == 0) - cc->connector = qd_server_connect(qd, &cc->configuration, cc); - cc = DEQ_NEXT(cc); + while (ct) { + qd_connector_connect(ct); + ct = DEQ_NEXT(ct); } first_start = false; @@ -664,12 +630,13 @@ void qd_connection_manager_start(qd_dispatch_t *qd) void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *impl) { - qd_config_listener_t *cl = (qd_config_listener_t*) impl; - - if (cl) { - qd_server_listener_close(cl->listener); - DEQ_REMOVE(qd->connection_manager->config_listeners, cl); - config_listener_free(qd->connection_manager, cl); + qd_listener_t *li = (qd_listener_t*) impl; + if (li) { + if (li->pn_listener) { + pn_listener_close(li->pn_listener); + } + DEQ_REMOVE(qd->connection_manager->listeners, li); + qd_listener_decref(li); } } @@ -681,19 +648,30 @@ void qd_connection_manager_delete_ssl_profile(qd_dispatch_t *qd, void *impl) } +static void deferred_close(void *context, bool discard) { + if (!discard) { + pn_connection_close((pn_connection_t*)context); + } +} + + void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) { - qd_config_connector_t *cc = (qd_config_connector_t*) impl; - - if (cc) { - DEQ_REMOVE(qd->connection_manager->config_connectors, cc); - config_connector_free(qd->connection_manager, cc); + qd_connector_t *ct = (qd_connector_t*) impl; + if (ct) { + sys_mutex_lock(ct->lock); + if (ct->ctx && ct->ctx->pn_conn) { + qd_connection_invoke_deferred(ct->ctx, deferred_close, ct->ctx->pn_conn); + } + sys_mutex_unlock(ct->lock); + DEQ_REMOVE(qd->connection_manager->connectors, ct); + qd_connector_decref(ct); } } -const char *qd_config_connector_name(qd_config_connector_t *cc) +const char *qd_connector_name(qd_connector_t *ct) { - return cc ? cc->configuration.name : 0; + return ct ? ct->config.name : 0; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 1fb83a6..ddc0418 100644 --- a/src/container.c +++ b/src/container.c @@ -106,7 +106,6 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) pn_link_close(pn_link); return; } - link->pn_sess = pn_link_session(pn_link); link->pn_link = pn_link; link->direction = QD_OUTGOING; @@ -234,7 +233,7 @@ static void notify_opened(qd_container_t *container, qd_connection_t *conn, void void policy_notify_opened(void *container, qd_connection_t *conn, void *context) { - notify_opened((qd_container_t *)container, (qd_connection_t *)conn, context); + notify_opened((qd_container_t *)container, (qd_connection_t *)conn, context); } static void notify_closed(qd_container_t *container, qd_connection_t *conn, void *context) @@ -287,27 +286,21 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p } -static int close_handler(qd_container_t *container, void* conn_context, pn_connection_t *conn, qd_connection_t* qd_conn) +static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) { // // Close all links, passing QD_LOST as the reason. These links are not // being properly 'detached'. They are being orphaned. // close_links(container, conn, true); - - // close the connection - pn_connection_close(conn); - - notify_closed(container, qd_conn, conn_context); + notify_closed(container, qd_conn, qd_connection_get_context(qd_conn)); return 0; } -static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) +static void writable_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) { const qd_node_type_t *nt; - int event_count = 0; - // // Note the locking structure in this function. Generally this would be unsafe, but since // this particular list is only ever appended to and never has items inserted or deleted, @@ -320,14 +313,12 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd while (nt_item) { nt = nt_item->ntype; if (nt->writable_handler) - event_count += nt->writable_handler(nt->type_context, qd_conn, 0); + nt->writable_handler(nt->type_context, qd_conn, 0); sys_mutex_lock(container->lock); nt_item = DEQ_NEXT(nt_item); sys_mutex_unlock(container->lock); } - - return event_count; } /** @@ -382,7 +373,18 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_ses } -void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn) + +/* + * FIXME aconway 2017-04-12: IMO this should not be necessary, we should + * be able to pn_*_free links and sessions directly the handler function. + * They will not actually be freed from memory till the event, connection, + * proactor etc. have all released their references. + * + * The need for these lists may indicate a router bug, where the router is + * using links/sessions after they are freed. Investigate and simplify if + * possible. + */ +static void conn_event_complete(qd_connection_t *qd_conn) { qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list); qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list); @@ -405,27 +407,38 @@ void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn) } } -int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn) + +void qd_container_handle_event(qd_container_t *container, pn_event_t *event) { - qd_container_t *container = (qd_container_t*) handler_context; - pn_connection_t *conn = qd_connection_pn(qd_conn); - pn_session_t *ssn; - pn_link_t *pn_link; - qd_link_t *qd_link; - pn_delivery_t *delivery; + pn_connection_t *conn = pn_event_connection(event); + qd_connection_t *qd_conn = conn ? pn_connection_get_context(conn) : NULL; + pn_session_t *ssn = NULL; + pn_link_t *pn_link = NULL; + qd_link_t *qd_link = NULL; + pn_delivery_t *delivery = NULL; switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN : qd_connection_set_user(qd_conn); if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { // This Open is an externally initiated connection // Let policy engine decide - qd_connection_set_event_stall(qd_conn, true); + /* TODO aconway 2017-04-11: presently the policy test is run + * in the current thread. + * + * If/when the policy test can run in another thread, the connection + * can be stalled by saving the current pn_event_batch and passing it + * to pn_proactor_done() when the policy check is complete. Note we + * can't run the policy check as a deferred function on the current + * connection since by stalling the current connection it will never be + * run, so we need some other thread context to run it in. + */ qd_conn->open_container = (void *)container; - qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, qd_conn); + qd_policy_amqp_open(qd_conn); } else { // This Open is in response to an internally initiated connection - notify_opened(container, qd_conn, conn_context); + notify_opened(container, qd_conn, qd_connection_get_context(qd_conn)); } break; @@ -451,20 +464,21 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even } } break; + case PN_SESSION_LOCAL_CLOSE : ssn = pn_event_session(event); - pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); while (pn_link) { - qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); - qd_link->pn_link = 0; - pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); + qd_link->pn_link = 0; + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); } if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { add_session_to_free_list(&qd_conn->free_link_session_list,ssn); } break; + case PN_SESSION_REMOTE_CLOSE : if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { ssn = pn_event_session(event); @@ -600,28 +614,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even } break; - default: - break; - } - return 1; -} - - -static int handler(void *handler_context, void *conn_context, qd_conn_event_t event, qd_connection_t *qd_conn) -{ - qd_container_t *container = (qd_container_t*) handler_context; - pn_connection_t *conn = qd_connection_pn(qd_conn); - - switch (event) { + case PN_CONNECTION_WAKE: + writable_handler(container, conn, qd_conn); + break; - case QD_CONN_EVENT_CLOSE: - return close_handler(container, conn_context, conn, qd_conn); + case PN_TRANSPORT_CLOSED: + close_handler(container, conn, qd_conn); + break; - case QD_CONN_EVENT_WRITABLE: - return writable_handler(container, conn, qd_conn); + default: + break; + } + if (qd_conn) { + conn_event_complete(qd_conn); } - - return 0; } @@ -639,8 +645,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd) DEQ_INIT(container->nodes); DEQ_INIT(container->node_type_list); - qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container); - + qd_server_set_container(qd, container); qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized"); return container; } @@ -801,6 +806,9 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node) qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name) { qd_link_t *link = new_qd_link_t(); + if (!link) { + return NULL; + } const qd_server_config_t * cf = qd_connection_config(conn); link->pn_sess = pn_session(qd_connection_pn(conn)); @@ -932,7 +940,7 @@ void qd_link_activate(qd_link_t *link) if (!ctx) return; - qd_server_activate(ctx, true); + qd_server_activate(ctx); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/error.c ---------------------------------------------------------------------- diff --git a/src/error.c b/src/error.c index 6b7239a..b837a65 100644 --- a/src/error.c +++ b/src/error.c @@ -64,7 +64,7 @@ void qd_error_initialize() { log_source = qd_log_source("ERROR"); } -qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...) { +qd_error_t qd_error_vimpl(qd_error_t code, const char *file, int line, const char *fmt, va_list ap) { ts.error_code = code; if (code) { char *begin = ts.error_message; @@ -75,10 +75,7 @@ qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char aprintf(&begin, end, "%s: ", name); else aprintf(&begin, end, "%d: ", code); - va_list arglist; - va_start(arglist, fmt); - vaprintf(&begin, end, fmt, arglist); - va_end(arglist); + vaprintf(&begin, end, fmt, ap); // NOTE: Use the file/line from the qd_error macro, not this line in error.c qd_log_impl(log_source, QD_LOG_ERROR, file, line, "%s", qd_error_message()); return code; @@ -88,6 +85,14 @@ qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char return 0; } +qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...) { + va_list ap; + va_start(ap, fmt); + qd_error_t err = qd_error_vimpl(code, file, line, fmt, ap); + va_end(ap); + return err; +} + qd_error_t qd_error_clear() { ts.error_code = 0; snprintf(ts.error_message, ERROR_MAX, "No Error"); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/http-none.c ---------------------------------------------------------------------- diff --git a/src/http-none.c b/src/http-none.c index b9af9e1..a8953e5 100644 --- a/src/http-none.c +++ b/src/http-none.c @@ -18,7 +18,6 @@ */ #include <qpid/dispatch/log.h> -#include <qpid/dispatch/driver.h> #include "http.h" /* No HTTP implementation available. */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/policy.c ---------------------------------------------------------------------- diff --git a/src/policy.c b/src/policy.c index 39ead3c..a55f245 100644 --- a/src/policy.c +++ b/src/policy.c @@ -184,9 +184,8 @@ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) { // error conditions. // -bool qd_policy_socket_accept(void *context, const char *hostname) +bool qd_policy_socket_accept(qd_policy_t *policy, const char *hostname) { - qd_policy_t *policy = (qd_policy_t *)context; bool result = true; if (n_connections < policy->max_connection_limit) { // connection counted and allowed @@ -205,10 +204,8 @@ bool qd_policy_socket_accept(void *context, const char *hostname) // // -void qd_policy_socket_close(void *context, const qd_connection_t *conn) +void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn) { - qd_policy_t *policy = (qd_policy_t *)context; - n_connections -= 1; assert (n_connections >= 0); if (policy->enableVhostPolicy) { @@ -567,8 +564,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con return result; } -// -// + bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { const char *hostip = qd_connection_hostip(qd_conn); @@ -678,55 +674,48 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q } -// -// -void qd_policy_amqp_open(void *context, bool discard) -{ - qd_connection_t *qd_conn = (qd_connection_t *)context; - if (!discard) { - pn_connection_t *conn = qd_connection_pn(qd_conn); - qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server); - qd_policy_t *policy = qd->policy; - bool connection_allowed = true; - - if (policy->enableVhostPolicy) { - // Open connection or not based on policy. - pn_transport_t *pn_trans = pn_connection_transport(conn); - const char *hostip = qd_connection_hostip(qd_conn); - const char *pcrh = pn_connection_remote_hostname(conn); - const char *vhost = (pcrh ? pcrh : ""); - const char *conn_name = qd_connection_name(qd_conn); +void qd_policy_amqp_open(qd_connection_t *qd_conn) { + pn_connection_t *conn = qd_connection_pn(qd_conn); + qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server); + qd_policy_t *policy = qd->policy; + bool connection_allowed = true; + + if (policy->enableVhostPolicy) { + // Open connection or not based on policy. + pn_transport_t *pn_trans = pn_connection_transport(conn); + const char *hostip = qd_connection_hostip(qd_conn); + const char *pcrh = pn_connection_remote_hostname(conn); + const char *vhost = (pcrh ? pcrh : ""); + const char *conn_name = qd_connection_name(qd_conn); #define SETTINGS_NAME_SIZE 256 - char settings_name[SETTINGS_NAME_SIZE]; - uint32_t conn_id = qd_conn->connection_id; - qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings - memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t)); - - if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, vhost, conn_name, - settings_name, SETTINGS_NAME_SIZE, conn_id, - qd_conn->policy_settings) && - settings_name[0]) { - // This connection is allowed by policy. - // Apply transport policy settings - if (qd_conn->policy_settings->maxFrameSize > 0) - pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize); - if (qd_conn->policy_settings->maxSessions > 0) - pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1); - } else { - // This connection is denied by policy. - connection_allowed = false; - } + char settings_name[SETTINGS_NAME_SIZE]; + uint32_t conn_id = qd_conn->connection_id; + qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings + memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t)); + + if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, vhost, conn_name, + settings_name, SETTINGS_NAME_SIZE, conn_id, + qd_conn->policy_settings) && + settings_name[0]) { + // This connection is allowed by policy. + // Apply transport policy settings + if (qd_conn->policy_settings->maxFrameSize > 0) + pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize); + if (qd_conn->policy_settings->maxSessions > 0) + pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1); } else { - // No policy implies automatic policy allow - // Note that connections not governed by policy have no policy_settings. - } - if (connection_allowed) { - if (pn_connection_state(conn) & PN_LOCAL_UNINIT) - pn_connection_open(conn); - policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context); - } else { - qd_policy_private_deny_amqp_connection(conn, QD_AMQP_COND_RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED); + // This connection is denied by policy. + connection_allowed = false; } + } else { + // No policy implies automatic policy allow + // Note that connections not governed by policy have no policy_settings. + } + if (connection_allowed) { + if (pn_connection_state(conn) & PN_LOCAL_UNINIT) + pn_connection_open(conn); + policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context); + } else { + qd_policy_private_deny_amqp_connection(conn, QD_AMQP_COND_RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED); } - qd_connection_set_event_stall(qd_conn, false); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/policy.h ---------------------------------------------------------------------- diff --git a/src/policy.h b/src/policy.h index c89ca2b..23f3610 100644 --- a/src/policy.h +++ b/src/policy.h @@ -99,7 +99,7 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity); * @param[in] name the connector name * @return the connection is allowed or not **/ -bool qd_policy_socket_accept(void *context, const char *hostname); +bool qd_policy_socket_accept(qd_policy_t *context, const char *hostname); /** Record a closing connection. @@ -109,7 +109,7 @@ bool qd_policy_socket_accept(void *context, const char *hostname); * @param[in] context the current policy * @param[in] conn qd_connection **/ -void qd_policy_socket_close(void *context, const qd_connection_t *conn); +void qd_policy_socket_close(qd_policy_t *context, const qd_connection_t *conn); /** Approve a new session based on connection's policy. @@ -153,10 +153,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q * allowed to make this connection. * Denied pn_connections are closed with a condition. * Allowed connections are signaled through qd_connection_manager. - * This function is called from the deferred queue. - * @param[in] context a qd_connection_t object - * @param[in] discard callback switch **/ -void qd_policy_amqp_open(void *context, bool discard); +void qd_policy_amqp_open(qd_connection_t *conn); #endif --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
