Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 53b1043f7 -> 795bea38f


Add a policy entity and a policy section to the config .json file.
Implement absolute connection counting with denial and logging.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/795bea38
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/795bea38
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/795bea38

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: 795bea38fc7292c059adfedc9d405ebfbb91e02a
Parents: 53b1043
Author: Chuck Rolke <[email protected]>
Authored: Fri Nov 6 17:15:21 2015 -0500
Committer: Chuck Rolke <[email protected]>
Committed: Fri Nov 6 17:15:21 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/driver.h                  |   4 +-
 python/qpid_dispatch/management/qdrouter.json   |  16 ++
 python/qpid_dispatch_internal/dispatch.py       |   1 +
 .../qpid_dispatch_internal/management/agent.py  |   9 ++
 .../qpid_dispatch_internal/management/config.py |   2 +-
 src/container.c                                 |  21 +--
 src/dispatch.c                                  |  12 ++
 src/dispatch_private.h                          |   7 +
 src/policy.c                                    | 152 +++++++++++++++++--
 src/policy_private.h                            |  30 +++-
 src/posix/driver.c                              |   7 +-
 src/server.c                                    |   8 +-
 12 files changed, 236 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/include/qpid/dispatch/driver.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h
index c286c44..1a69a27 100644
--- a/include/qpid/dispatch/driver.h
+++ b/include/qpid/dispatch/driver.h
@@ -174,9 +174,11 @@ void qdpn_listener_trace(qdpn_listener_t *listener, 
pn_trace_t trace);
 /** Accept a connection that is pending on the listener.
  *
  * @param[in] listener the listener to accept the connection on
+ * @param[in] policy function that accepts remote host name and returns 
+ *            decision to allow or deny this connection
  * @return a new connector for the remote, or NULL on error
  */
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener);
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener, void 
*policy, bool (*policy_fn)(void *, const char *));
 
 /** Access the application context that is associated with the listener.
  *

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index d9e6343..1aa1e7d 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -713,6 +713,7 @@
                         "CONFIG",
                         "ERROR",
                         "DISPATCH",
+                        "POLICY",
                         "DEFAULT"
                     ],
                     "required": true,
@@ -999,6 +1000,21 @@
             }
         },
 
+        "policy": {
+            "description": "Defines user connection and access policy",
+            "extends": "configurationEntity",
+            "singleton": true,
+            "attributes": {
+                "maximumConnections": {
+                    "type": "integer",
+                    "default": 0,
+                    "description": "The maximum number of client connections 
allowed. Zero implies no limit.",
+                    "required": false,
+                    "create": true
+                }
+            }
+        },
+
         "dummy": {
             "description": "Dummy entity for test purposes.",
             "extends": "entity",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py 
b/python/qpid_dispatch_internal/dispatch.py
index 7c70a75..f294e0f 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -64,6 +64,7 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_address, None, 
[self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_waypoint, None, 
[self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_lrp, None, 
[self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_policy, None, 
[self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_set_agent, None, [self.qd_dispatch_p, 
py_object])
 
         self._prototype(self.qd_router_setup_late, None, [self.qd_dispatch_p])

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py 
b/python/qpid_dispatch_internal/management/agent.py
index 1436024..3efe1ff 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -269,6 +269,15 @@ class LogEntity(EntityAdapter):
         """Can't actually delete a log source but return it to the default 
state"""
         self._qd.qd_log_source_reset(self.attributes['module'])
 
+class PolicyEntity(EntityAdapter):
+
+    def create(self):
+        self._qd.qd_dispatch_configure_policy(self._dispatch, self)
+
+    def _identifier(self):
+        return self.attributes.get('module')
+
+
 def _addr_port_identifier(entity):
     for attr in ['addr', 'port']: # Set default values if need be
         entity.attributes.setdefault(

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py 
b/python/qpid_dispatch_internal/management/config.py
index 73725c1..65ab05b 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -149,7 +149,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
     qd.qd_router_setup_late(dispatch) # Actions requiring active management 
agent.
 
     # Remaining configuration
-    for t in "fixedAddress", "listener", "connector", "waypoint", 
"linkRoutePattern":
+    for t in "fixedAddress", "listener", "connector", "waypoint", 
"linkRoutePattern", "policy":
         for a in config.by_type(t): configure(a)
     for e in config.entities:
         configure(e)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index c52ed15..5f9d161 100644
--- a/src/container.c
+++ b/src/container.c
@@ -319,22 +319,17 @@ int pn_event_handler(void *handler_context, void 
*conn_context, pn_event_t *even
     qd_link_t       *qd_link;
     pn_delivery_t   *delivery;
 
+//    qd_log(container->log_source, QD_LOG_CRITICAL, "pn_event_handler: %s", 
pn_event_type_name(pn_event_type(event)));
+
     switch (pn_event_type(event)) {
     case PN_CONNECTION_REMOTE_OPEN :
-        if (true) {  // TODO: detect if a policy engine is present
-            // Let policy engine decide about this connection
-            if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
-                // This Open is an externally initiated connection
-                qd_connection_set_event_stall(qd_conn, true);
-                qd_connection_invoke_deferred(qd_conn, qd_policy_handle_open, 
qd_conn);
-            } else {
-                // This Open is in response to an internally initiated 
connection
-                qd_connection_manager_connection_opened(qd_conn);
-            }
+        // Let policy engine decide about all connections
+        if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
+            // This Open is an externally initiated connection
+            qd_connection_set_event_stall(qd_conn, true);
+            qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, 
qd_conn);
         } else {
-            // No policy engine; allow the connection
-            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
-                pn_connection_open(conn);
+            // This Open is in response to an internally initiated connection
             qd_connection_manager_connection_opened(qd_conn);
         }
         break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 0a0d776..d9575e2 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -31,6 +31,7 @@
 #include "router_private.h"
 #include "waypoint_private.h"
 #include "message_private.h"
+#include "policy_private.h"
 #include "entity.h"
 #include "entity_cache.h"
 #include <dlfcn.h>
@@ -43,6 +44,8 @@ qd_server_t    *qd_server(qd_dispatch_t *qd, int tc, const 
char *container_name,
 void            qd_server_free(qd_server_t *server);
 qd_container_t *qd_container(qd_dispatch_t *qd);
 void            qd_container_free(qd_container_t *container);
+qd_policy_t    *qd_policy(qd_dispatch_t *qd);
+void            qd_policy_free(qd_policy_t *policy);
 qd_router_t    *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char 
*area, const char *id);
 void            qd_router_setup_late(qd_dispatch_t *qd);
 void            qd_router_free(qd_router_t *router);
@@ -138,12 +141,20 @@ qd_error_t qd_dispatch_configure_lrp(qd_dispatch_t *qd, 
qd_entity_t *entity) {
     return qd_error_code();
 }
 
+qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity)
+{
+    qd_router_configure_policy(qd->policy, entity);
+    return QD_ERROR_NONE;
+}
+
+
 qd_error_t qd_dispatch_prepare(qd_dispatch_t *qd)
 {
     qd->server             = qd_server(qd, qd->thread_count, 
qd->container_name, qd->sasl_config_path, qd->sasl_config_name);
     qd->container          = qd_container(qd);
     qd->router             = qd_router(qd, qd->router_mode, qd->router_area, 
qd->router_id);
     qd->connection_manager = qd_connection_manager(qd);
+    qd->policy             = qd_policy(qd);
     return qd_error_code();
 }
 
@@ -160,6 +171,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
     free(qd->container_name);
     free(qd->router_area);
     qd_connection_manager_free(qd->connection_manager);
+    qd_policy_free(qd->policy);
     Py_XDECREF((PyObject*) qd->agent);
     qd_router_free(qd->router);
     qd_container_free(qd->container);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index cc7df08..45f293c 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -40,6 +40,7 @@ typedef struct qd_config_address_t   qd_config_address_t;
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/router.h>
 #include <qpid/dispatch/connection_manager.h>
+#include "policy_private.h"
 #include "server_private.h"
 #include "router_private.h"
 
@@ -49,6 +50,7 @@ struct qd_dispatch_t {
     qd_router_t             *router;
     void                    *agent;
     qd_connection_manager_t *connection_manager;
+    qd_policy_t             *policy;
 
     int    thread_count;
     char  *container_name;
@@ -102,6 +104,11 @@ qd_error_t qd_dispatch_configure_waypoint(qd_dispatch_t 
*qd, qd_entity_t *entity
 qd_error_t qd_dispatch_configure_lrp(qd_dispatch_t *qd, qd_entity_t *entity);
 
 /**
+ * Configure security policy, must be called after qd_dispatch_prepare
+ */
+qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t 
*entity);
+
+/**
  * \brief Configure the logging module from the
  *        parsed configuration file.  This must be called after the
  *        call to qd_dispatch_prepare completes.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index 8f95854..a08bff8 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -43,44 +43,166 @@
 //
 static bool allow_this = true;
 
+bool policy_engine()
+{
+    return allow_this;
+}
+
+void policy_engine_step()
+{
+    // allow_this = !allow_this;
+}
+
+
 //
-// error conditions
+// TODO: when policy dev is more complete lower the log level
+//
+#define POLICY_LOG_LEVEL QD_LOG_CRITICAL
+
+//
+// The current statistics maintained globally through multiple
+// reconfiguration of policy settings.
+//
+static int n_connections = 0;
+
+//
+// error conditions signaled to effect denial
 //
 static char* RESOURCE_LIMIT_EXCEEDED     = "amqp:resource-limit-exceeded";
 //static char* UNAUTHORIZED_ACCESS         = "amqp:unauthorized-access";
 //static char* CONNECTION_FORCED           = "amqp:connection:forced";
 
 //
-// error descriptions
+// error descriptions signaled to effect denial
 //
 static char* CONNECTION_DISALLOWED         = "connection disallowed by local 
policy";
 
 
-void qd_policy_handle_open(void *context, bool discard)
+//
+// Policy configuration/statistics management interface
+//
+struct qd_policy_t {
+    qd_dispatch_t        *qd;
+    qd_log_source_t      *log_source;
+
+    int                   max_connections;
+
+    int                   current_connections;
+};
+
+
+qd_policy_t *qd_policy(qd_dispatch_t *qd)
+{
+    qd_policy_t *policy = NEW(qd_policy_t);
+
+    policy->qd                  = qd;
+    policy->log_source          = qd_log_source("POLICY");
+    policy->max_connections     = 0;
+    policy->current_connections = 0;
+
+    qd_log(policy->log_source, QD_LOG_TRACE, "Policy Initialized");
+    return policy;
+}
+
+
+void qd_policy_free(qd_policy_t *policy)
+{
+    free(policy);
+}
+
+//
+//
+qd_error_t qd_router_configure_policy(qd_policy_t *policy, qd_entity_t *entity)
+{
+    policy->max_connections = qd_entity_opt_long(entity, "maximumConnections", 
0); QD_ERROR_RET();
+    if (policy->max_connections < 0)
+        return qd_error(QD_ERROR_CONFIG, "maximumConnections must be >= 0");
+    return QD_ERROR_NONE;
+}
+
+
+//
+// Functions related to absolute connection counts.
+// These handle connections at the socket level with
+// no regard to user identity. Simple yes/no decisions
+// are made and there is no AMQP channel for returning
+// error conditions.
+//
+bool qd_policy_socket_accept(void *context, const char *hostname)
+{
+    qd_policy_t *policy = (qd_policy_t *)context;
+    bool result = true;
+
+    if (policy->max_connections == 0) {
+        // Policy not in force; connection counted and allowed
+        n_connections += 1;
+        qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' allowed. 
N= %d", hostname, n_connections); // HACK EXTRA
+    } else {
+        // Policy in force
+        if (n_connections < policy->max_connections) {
+            // connection counted and allowed
+            n_connections += 1;
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' 
allowed. N= %d", hostname, n_connections);
+        } else {
+            // connection denied
+            result = false;
+            qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' 
denied, N=%d", hostname, n_connections);
+        }
+    }
+    return result;
+}
+
+
+void qd_policy_socket_close(void *context, const char *hostname)
+{
+    qd_policy_t *policy = (qd_policy_t *)context;
+
+    n_connections -= 1;
+    if (policy->max_connections > 0) {
+        assert (n_connections >= 0);
+        qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed, 
N=%d", hostname, n_connections);
+    }
+    qd_log(policy->log_source, POLICY_LOG_LEVEL, "Connection '%s' closed, 
N=%d", hostname, n_connections);  // HACK EXTRA
+}
+
+
+//
+// Functions related to authenticated connection denial.
+// An AMQP Open has been received over some connection.
+// Evaluate the connection auth and the Open fields to
+// allow or deny the Open. Denied Open attempts are
+// effected with a returned Open-Close_with_condition.
+//
+void qd_policy_private_deny_amqp_connection(pn_connection_t *conn, const char 
*cond_name, const char *cond_descr)
+{
+    // Set the error condition and close the connection.
+    // Over the wire this will send an open frame followed
+    // immediately by a close frame with the error condition.
+    pn_condition_t * cond = pn_connection_condition(conn);
+    (void) pn_condition_set_name(       cond, cond_name);
+    (void) pn_condition_set_description(cond, cond_descr);
+    pn_connection_close(conn);
+}
+
+void qd_policy_amqp_open(void *context, bool discard)
 {
     qd_connection_t *qd_conn = (qd_connection_t *)context;
-    
+
     if (!discard) {
         pn_connection_t *conn = qd_connection_pn(qd_conn);
 
-        if (allow_this) { // TODO: Consult actual policy engine
+        // Consult policy engine for this connection attempt
+        if ( policy_engine() ) { // TODO: get rid of this phony policy engine
             // This connection is allowed.
             if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
                 pn_connection_open(conn);
             qd_connection_manager_connection_opened(qd_conn);
         } else {
             // This connection is denied.
-            // Set the error condition and close the connection.
-            // Over the wire this will send an open frame followed
-            // immediately by a close frame with the error condition.
-            pn_condition_t * cond = pn_connection_condition(conn);
-            (void) pn_condition_set_name(       cond, RESOURCE_LIMIT_EXCEEDED);
-            (void) pn_condition_set_description(cond, CONNECTION_DISALLOWED);
-            pn_connection_close(conn);
+            qd_policy_private_deny_amqp_connection(conn, 
RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
         }
-    
-        // update the policy
-        //allow_this = !allow_this;
+        // update the phony policy engine
+        policy_engine_step();
     }
     qd_connection_set_event_stall(qd_conn, false);
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/policy_private.h
----------------------------------------------------------------------
diff --git a/src/policy_private.h b/src/policy_private.h
index 20e0382..2647b85 100644
--- a/src/policy_private.h
+++ b/src/policy_private.h
@@ -30,14 +30,42 @@
 #include "entity_cache.h"
 #include <dlfcn.h>
 
+typedef struct qd_policy_t qd_policy_t;
+
+qd_error_t qd_router_configure_policy(qd_policy_t *policy, qd_entity_t 
*entity);
+
+
+/** Allow or deny an incoming connection based on connection count(s).
+ * A server listener has just accepted a socket.
+ * Allow or deny this connection based on the absolute number
+ *  of allowed connections.
+ * The identity of the connecting user has not been negotiated yet.
+ * @param[in] context the current policy
+ * @param[in] name the connector name
+ * @return the connection is allowed or not
+ **/
+bool qd_policy_socket_accept(void *context, const char *hostname);
+
+
+/** Record a closing connection.
+ * A server listener is closing a socket.
+ * Release the counted connection against provisioned limits
+ * 
+ * @param[in] context the current policy
+ * @param[in] name the connector name
+ **/
+void qd_policy_socket_close(void *context, const char *hostname);
+
+
 /** Allow or deny an incoming connection.
  * An Open performative was received over a new connection.
  * Consult local policy to determine if this host/user is
  * allow to make this connection. The underlying proton 
  * connection is either opened or closed.
+ * This function is called from the deferred queue.
  * @param[in] context a qd_connection_t object
  * @param[in] discard callback switch
  **/
-void qd_policy_handle_open(void *context, bool discard);
+void qd_policy_amqp_open(void *context, bool discard);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index 5eb7c55..ca7d48c 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -346,7 +346,7 @@ void qdpn_listener_set_context(qdpn_listener_t *listener, 
void *context)
     listener->context = context;
 }
 
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, void *policy, bool 
(*policy_fn)(void *, const char *name))
 {
     if (!l || !l->pending) return NULL;
     char name[PN_NAME_MAX];
@@ -373,6 +373,11 @@ qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
         }
     }
 
+    if (policy_fn && !(*policy_fn)(policy, name)) {
+        close(sock);
+        return 0;
+    }
+    
     if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
         fprintf(stderr, "Accepted from %s\n", name);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/795bea38/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 512bcc0..8203c33 100644
--- a/src/server.c
+++ b/src/server.c
@@ -25,6 +25,7 @@
 #include "entity.h"
 #include "entity_cache.h"
 #include "dispatch_private.h"
+#include "policy_private.h"
 #include "server_private.h"
 #include "timer_private.h"
 #include "alloc.h"
@@ -239,7 +240,7 @@ static void thread_process_listeners_LH(qd_server_t 
*qd_server)
     qd_connection_t  *ctx;
 
     for (listener = qdpn_driver_listener(driver); listener; listener = 
qdpn_driver_listener(driver)) {
-        cxtr = qdpn_listener_accept(listener);
+        cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, 
&qd_policy_socket_accept);
         if (!cxtr)
             continue;
 
@@ -247,6 +248,7 @@ static void thread_process_listeners_LH(qd_server_t 
*qd_server)
 
         qd_log(qd_server->log_source, QD_LOG_DEBUG, "Accepting %s",
                log_incoming(logbuf, sizeof(logbuf), cxtr));
+        
         ctx = new_qd_connection_t();
         DEQ_ITEM_INIT(ctx);
         ctx->server        = qd_server;
@@ -714,6 +716,10 @@ static void *thread_run(void *arg)
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
 
+                if (!ctx->connector) {
+                    qd_policy_socket_close(qd_server->qd->policy, 
qdpn_connector_name(cxtr));
+                }
+
                 qdpn_connector_free(cxtr);
                 if (conn)
                     pn_connection_free(conn);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to