Repository: qpid-dispatch Updated Branches: refs/heads/crolke-DISPATCH-188-1 53b1043f7 -> 795bea38f
Add a policy entity and a policy section to the config .json file. Implement absolute connection counting with denial and logging. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/795bea38 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/795bea38 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/795bea38 Branch: refs/heads/crolke-DISPATCH-188-1 Commit: 795bea38fc7292c059adfedc9d405ebfbb91e02a Parents: 53b1043 Author: Chuck Rolke <[email protected]> Authored: Fri Nov 6 17:15:21 2015 -0500 Committer: Chuck Rolke <[email protected]> Committed: Fri Nov 6 17:15:21 2015 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/driver.h | 4 +- python/qpid_dispatch/management/qdrouter.json | 16 ++ python/qpid_dispatch_internal/dispatch.py | 1 + .../qpid_dispatch_internal/management/agent.py | 9 ++ .../qpid_dispatch_internal/management/config.py | 2 +- src/container.c | 21 +-- src/dispatch.c | 12 ++ src/dispatch_private.h | 7 + src/policy.c | 152 +++++++++++++++++-- src/policy_private.h | 30 +++- src/posix/driver.c | 7 +- src/server.c | 8 +- 12 files changed, 236 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/include/qpid/dispatch/driver.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h index c286c44..1a69a27 100644 --- a/include/qpid/dispatch/driver.h +++ b/include/qpid/dispatch/driver.h @@ -174,9 +174,11 @@ void qdpn_listener_trace(qdpn_listener_t *listener, pn_trace_t trace); /** Accept a connection that is pending on the listener. * * @param[in] listener the listener to accept the connection on + * @param[in] policy function that accepts remote host name and returns + * decision to allow or deny this connection * @return a new connector for the remote, or NULL on error */ -qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener); +qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener, void *policy, bool (*policy_fn)(void *, const char *)); /** Access the application context that is associated with the listener. * http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index d9e6343..1aa1e7d 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -713,6 +713,7 @@ "CONFIG", "ERROR", "DISPATCH", + "POLICY", "DEFAULT" ], "required": true, @@ -999,6 +1000,21 @@ } }, + "policy": { + "description": "Defines user connection and access policy", + "extends": "configurationEntity", + "singleton": true, + "attributes": { + "maximumConnections": { + "type": "integer", + "default": 0, + "description": "The maximum number of client connections allowed. Zero implies no limit.", + "required": false, + "create": true + } + } + }, + "dummy": { "description": "Dummy entity for test purposes.", "extends": "entity", http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/dispatch.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py index 7c70a75..f294e0f 100644 --- a/python/qpid_dispatch_internal/dispatch.py +++ b/python/qpid_dispatch_internal/dispatch.py @@ -64,6 +64,7 @@ class QdDll(ctypes.PyDLL): self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_waypoint, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_configure_lrp, None, [self.qd_dispatch_p, py_object]) + self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, py_object]) self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p]) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/management/agent.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py index 1436024..3efe1ff 100644 --- a/python/qpid_dispatch_internal/management/agent.py +++ b/python/qpid_dispatch_internal/management/agent.py @@ -269,6 +269,15 @@ class LogEntity(EntityAdapter): """Can't actually delete a log source but return it to the default state""" self._qd.qd_log_source_reset(self.attributes['module']) +class PolicyEntity(EntityAdapter): + + def create(self): + self._qd.qd_dispatch_configure_policy(self._dispatch, self) + + def _identifier(self): + return self.attributes.get('module') + + def _addr_port_identifier(entity): for attr in ['addr', 'port']: # Set default values if need be entity.attributes.setdefault( http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/management/config.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py index 73725c1..65ab05b 100644 --- a/python/qpid_dispatch_internal/management/config.py +++ b/python/qpid_dispatch_internal/management/config.py @@ -149,7 +149,7 @@ def configure_dispatch(dispatch, lib_handle, filename): qd.qd_router_setup_late(dispatch) # Actions requiring active management agent. # Remaining configuration - for t in "fixedAddress", "listener", "connector", "waypoint", "linkRoutePattern": + for t in "fixedAddress", "listener", "connector", "waypoint", "linkRoutePattern", "policy": for a in config.by_type(t): configure(a) for e in config.entities: configure(e) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index c52ed15..5f9d161 100644 --- a/src/container.c +++ b/src/container.c @@ -319,22 +319,17 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even qd_link_t *qd_link; pn_delivery_t *delivery; +// qd_log(container->log_source, QD_LOG_CRITICAL, "pn_event_handler: %s", pn_event_type_name(pn_event_type(event))); + switch (pn_event_type(event)) { case PN_CONNECTION_REMOTE_OPEN : - if (true) { // TODO: detect if a policy engine is present - // Let policy engine decide about this connection - if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { - // This Open is an externally initiated connection - qd_connection_set_event_stall(qd_conn, true); - qd_connection_invoke_deferred(qd_conn, qd_policy_handle_open, qd_conn); - } else { - // This Open is in response to an internally initiated connection - qd_connection_manager_connection_opened(qd_conn); - } + // Let policy engine decide about all connections + if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { + // This Open is an externally initiated connection + qd_connection_set_event_stall(qd_conn, true); + qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, qd_conn); } else { - // No policy engine; allow the connection - if (pn_connection_state(conn) & PN_LOCAL_UNINIT) - pn_connection_open(conn); + // This Open is in response to an internally initiated connection qd_connection_manager_connection_opened(qd_conn); } break; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/dispatch.c ---------------------------------------------------------------------- diff --git a/src/dispatch.c b/src/dispatch.c index 0a0d776..d9575e2 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -31,6 +31,7 @@ #include "router_private.h" #include "waypoint_private.h" #include "message_private.h" +#include "policy_private.h" #include "entity.h" #include "entity_cache.h" #include <dlfcn.h> @@ -43,6 +44,8 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int tc, const char *container_name, void qd_server_free(qd_server_t *server); qd_container_t *qd_container(qd_dispatch_t *qd); void qd_container_free(qd_container_t *container); +qd_policy_t *qd_policy(qd_dispatch_t *qd); +void qd_policy_free(qd_policy_t *policy); qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id); void qd_router_setup_late(qd_dispatch_t *qd); void qd_router_free(qd_router_t *router); @@ -138,12 +141,20 @@ qd_error_t qd_dispatch_configure_lrp(qd_dispatch_t *qd, qd_entity_t *entity) { return qd_error_code(); } +qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_router_configure_policy(qd->policy, entity); + return QD_ERROR_NONE; +} + + qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd) { qd->server = qd_server(qd, qd->thread_count, qd->container_name, qd->sasl_config_path, qd->sasl_config_name); qd->container = qd_container(qd); qd->router = qd_router(qd, qd->router_mode, qd->router_area, qd->router_id); qd->connection_manager = qd_connection_manager(qd); + qd->policy = qd_policy(qd); return qd_error_code(); } @@ -160,6 +171,7 @@ void qd_dispatch_free(qd_dispatch_t *qd) free(qd->container_name); free(qd->router_area); qd_connection_manager_free(qd->connection_manager); + qd_policy_free(qd->policy); Py_XDECREF((PyObject*) qd->agent); qd_router_free(qd->router); qd_container_free(qd->container); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/dispatch_private.h ---------------------------------------------------------------------- diff --git a/src/dispatch_private.h b/src/dispatch_private.h index cc7df08..45f293c 100644 --- a/src/dispatch_private.h +++ b/src/dispatch_private.h @@ -40,6 +40,7 @@ typedef struct qd_config_address_t qd_config_address_t; #include <qpid/dispatch/container.h> #include <qpid/dispatch/router.h> #include <qpid/dispatch/connection_manager.h> +#include "policy_private.h" #include "server_private.h" #include "router_private.h" @@ -49,6 +50,7 @@ struct qd_dispatch_t { qd_router_t *router; void *agent; qd_connection_manager_t *connection_manager; + qd_policy_t *policy; int thread_count; char *container_name; @@ -102,6 +104,11 @@ qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t *qd, qd_entity_t *entity qd_error_t qd_dispatch_configure_lrp(qd_dispatch_t *qd, qd_entity_t *entity); /** + * Configure security policy, must be called after qd_dispatch_prepare + */ +qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity); + +/** * \brief Configure the logging module from the * parsed configuration file. This must be called after the * call to qd_dispatch_prepare completes. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/policy.c ---------------------------------------------------------------------- diff --git a/src/policy.c b/src/policy.c index 8f95854..a08bff8 100644 --- a/src/policy.c +++ b/src/policy.c @@ -43,44 +43,166 @@ // static bool allow_this = true; +bool policy_engine() +{ + return allow_this; +} + +void policy_engine_step() +{ + // allow_this = !allow_this; +} + + // -// error conditions +// TODO: when policy dev is more complete lower the log level +// +#define POLICY_LOG_LEVEL QD_LOG_CRITICAL + +// +// The current statistics maintained globally through multiple +// reconfiguration of policy settings. +// +static int n_connections = 0; + +// +// error conditions signaled to effect denial // static char* RESOURCE_LIMIT_EXCEEDED = "amqp:resource-limit-exceeded"; //static char* UNAUTHORIZED_ACCESS = "amqp:unauthorized-access"; //static char* CONNECTION_FORCED = "amqp:connection:forced"; // -// error descriptions +// error descriptions signaled to effect denial // static char* CONNECTION_DISALLOWED = "connection disallowed by local policy"; -void qd_policy_handle_open(void *context, bool discard) +// +// Policy configuration/statistics management interface +// +struct qd_policy_t { + qd_dispatch_t *qd; + qd_log_source_t *log_source; + + int max_connections; + + int current_connections; +}; + + +qd_policy_t *qd_policy(qd_dispatch_t *qd) +{ + qd_policy_t *policy = NEW(qd_policy_t); + + policy->qd = qd; + policy->log_source = qd_log_source("POLICY"); + policy->max_connections = 0; + policy->current_connections = 0; + + qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized"); + return policy; +} + + +void qd_policy_free(qd_policy_t *policy) +{ + free(policy); +} + +// +// +qd_error_t qd_router_configure_policy(qd_policy_t *policy, qd_entity_t *entity) +{ + policy->max_connections = qd_entity_opt_long(entity, "maximumConnections", 0); QD_ERROR_RET(); + if (policy->max_connections < 0) + return qd_error(QD_ERROR_CONFIG, "maximumConnections must be >= 0"); + return QD_ERROR_NONE; +} + + +// +// Functions related to absolute connection counts. +// These handle connections at the socket level with +// no regard to user identity. Simple yes/no decisions +// are made and there is no AMQP channel for returning +// error conditions. +// +bool qd_policy_socket_accept(void *context, const char *hostname) +{ + qd_policy_t *policy = (qd_policy_t *)context; + bool result = true; + + if (policy->max_connections == 0) { + // Policy not in force; connection counted and allowed + n_connections += 1; + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' allowed. N= %d", hostname, n_connections); // HACK EXTRA + } else { + // Policy in force + if (n_connections < policy->max_connections) { + // connection counted and allowed + n_connections += 1; + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' allowed. N= %d", hostname, n_connections); + } else { + // connection denied + result = false; + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' denied, N=%d", hostname, n_connections); + } + } + return result; +} + + +void qd_policy_socket_close(void *context, const char *hostname) +{ + qd_policy_t *policy = (qd_policy_t *)context; + + n_connections -= 1; + if (policy->max_connections > 0) { + assert (n_connections >= 0); + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed, N=%d", hostname, n_connections); + } + qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed, N=%d", hostname, n_connections); // HACK EXTRA +} + + +// +// Functions related to authenticated connection denial. +// An AMQP Open has been received over some connection. +// Evaluate the connection auth and the Open fields to +// allow or deny the Open. Denied Open attempts are +// effected with a returned Open-Close_with_condition. +// +void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char *cond_name, const char *cond_descr) +{ + // Set the error condition and close the connection. + // Over the wire this will send an open frame followed + // immediately by a close frame with the error condition. + pn_condition_t * cond = pn_connection_condition(conn); + (void) pn_condition_set_name( cond, cond_name); + (void) pn_condition_set_description(cond, cond_descr); + pn_connection_close(conn); +} + +void qd_policy_amqp_open(void *context, bool discard) { qd_connection_t *qd_conn = (qd_connection_t *)context; - + if (!discard) { pn_connection_t *conn = qd_connection_pn(qd_conn); - if (allow_this) { // TODO: Consult actual policy engine + // Consult policy engine for this connection attempt + if ( policy_engine() ) { // TODO: get rid of this phony policy engine // This connection is allowed. if (pn_connection_state(conn) & PN_LOCAL_UNINIT) pn_connection_open(conn); qd_connection_manager_connection_opened(qd_conn); } else { // This connection is denied. - // Set the error condition and close the connection. - // Over the wire this will send an open frame followed - // immediately by a close frame with the error condition. - pn_condition_t * cond = pn_connection_condition(conn); - (void) pn_condition_set_name( cond, RESOURCE_LIMIT_EXCEEDED); - (void) pn_condition_set_description(cond, CONNECTION_DISALLOWED); - pn_connection_close(conn); + qd_policy_private_deny_amqp_connection(conn, RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED); } - - // update the policy - //allow_this = !allow_this; + // update the phony policy engine + policy_engine_step(); } qd_connection_set_event_stall(qd_conn, false); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/policy_private.h ---------------------------------------------------------------------- diff --git a/src/policy_private.h b/src/policy_private.h index 20e0382..2647b85 100644 --- a/src/policy_private.h +++ b/src/policy_private.h @@ -30,14 +30,42 @@ #include "entity_cache.h" #include <dlfcn.h> +typedef struct qd_policy_t qd_policy_t; + +qd_error_t qd_router_configure_policy(qd_policy_t *policy, qd_entity_t *entity); + + +/** Allow or deny an incoming connection based on connection count(s). + * A server listener has just accepted a socket. + * Allow or deny this connection based on the absolute number + * of allowed connections. + * The identity of the connecting user has not been negotiated yet. + * @param[in] context the current policy + * @param[in] name the connector name + * @return the connection is allowed or not + **/ +bool qd_policy_socket_accept(void *context, const char *hostname); + + +/** Record a closing connection. + * A server listener is closing a socket. + * Release the counted connection against provisioned limits + * + * @param[in] context the current policy + * @param[in] name the connector name + **/ +void qd_policy_socket_close(void *context, const char *hostname); + + /** Allow or deny an incoming connection. * An Open performative was received over a new connection. * Consult local policy to determine if this host/user is * allow to make this connection. The underlying proton * connection is either opened or closed. + * This function is called from the deferred queue. * @param[in] context a qd_connection_t object * @param[in] discard callback switch **/ -void qd_policy_handle_open(void *context, bool discard); +void qd_policy_amqp_open(void *context, bool discard); #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/posix/driver.c ---------------------------------------------------------------------- diff --git a/src/posix/driver.c b/src/posix/driver.c index 5eb7c55..ca7d48c 100644 --- a/src/posix/driver.c +++ b/src/posix/driver.c @@ -346,7 +346,7 @@ void qdpn_listener_set_context(qdpn_listener_t *listener, void *context) listener->context = context; } -qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l) +qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, void *policy, bool (*policy_fn)(void *, const char *name)) { if (!l || !l->pending) return NULL; char name[PN_NAME_MAX]; @@ -373,6 +373,11 @@ qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l) } } + if (policy_fn && !(*policy_fn)(policy, name)) { + close(sock); + return 0; + } + if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) fprintf(stderr, "Accepted from %s\n", name); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index 512bcc0..8203c33 100644 --- a/src/server.c +++ b/src/server.c @@ -25,6 +25,7 @@ #include "entity.h" #include "entity_cache.h" #include "dispatch_private.h" +#include "policy_private.h" #include "server_private.h" #include "timer_private.h" #include "alloc.h" @@ -239,7 +240,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) qd_connection_t *ctx; for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) { - cxtr = qdpn_listener_accept(listener); + cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept); if (!cxtr) continue; @@ -247,6 +248,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server) qd_log(qd_server->log_source, QD_LOG_DEBUG, "Accepting %s", log_incoming(logbuf, sizeof(logbuf), cxtr)); + ctx = new_qd_connection_t(); DEQ_ITEM_INIT(ctx); ctx->server = qd_server; @@ -714,6 +716,10 @@ static void *thread_run(void *arg) sys_mutex_lock(qd_server->lock); DEQ_REMOVE(qd_server->connections, ctx); + if (!ctx->connector) { + qd_policy_socket_close(qd_server->qd->policy, qdpn_connector_name(cxtr)); + } + qdpn_connector_free(cxtr); if (conn) pn_connection_free(conn); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
