Repository: qpid-dispatch
Updated Branches:
  refs/heads/crolke-DISPATCH-188-1 6a6b95176 -> 919b481f0


Add a policy stats class to track connection limits and state.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/919b481f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/919b481f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/919b481f

Branch: refs/heads/crolke-DISPATCH-188-1
Commit: 919b481f00092b3da0747e1881832c1adf7bed49
Parents: 6a6b951
Author: Chuck Rolke <[email protected]>
Authored: Mon Dec 14 17:13:17 2015 -0500
Committer: Chuck Rolke <[email protected]>
Committed: Mon Dec 14 17:13:17 2015 -0500

----------------------------------------------------------------------
 .../qpid_dispatch_internal/management/policy.py | 191 +++++++++++++++++--
 tests/system_tests_policy.py                    |  92 +++++++--
 2 files changed, 248 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/919b481f/python/qpid_dispatch_internal/management/policy.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/policy.py 
b/python/qpid_dispatch_internal/management/policy.py
index cd4d80e..17b323a 100644
--- a/python/qpid_dispatch_internal/management/policy.py
+++ b/python/qpid_dispatch_internal/management/policy.py
@@ -291,7 +291,9 @@ class PolicyKeys():
     # Internal policy key words
     KW_POLICY_VERSION = "policyVersion"
     KW_SCHEMA_VERSION = "schemaVersion"
-
+    KW_SCHEMA_MAXCONN        = "maximumConnections"
+    KW_SCHEMA_MAXCONNPERHOST = "maximumConnectionsPerHost"
+    KW_SCHEMA_MAXCONNPERUSER = "maximumConnectionsPerUser"
 
 
 #
@@ -310,9 +312,9 @@ class PolicyCompiler():
         'connectionAllowUnrestricted',
         'connectionOrigins',
         'connectionPolicy',
-        'maximumConnections',
-        'maximumConnectionsPerHost',
-        'maximumConnectionsPerUser',
+        PolicyKeys.KW_SCHEMA_MAXCONN,
+        PolicyKeys.KW_SCHEMA_MAXCONNPERHOST,
+        PolicyKeys.KW_SCHEMA_MAXCONNPERUSER,
         'policies',
         PolicyKeys.KW_POLICY_VERSION,
         'roles',
@@ -488,9 +490,9 @@ class PolicyCompiler():
                                     (name, key, cerror[0]))
                     return False
                 policy_out[key] = val
-            elif key in ['maximumConnections',
-                         'maximumConnectionsPerHost',
-                         'maximumConnectionsPerUser'
+            elif key in [PolicyKeys.KW_SCHEMA_MAXCONN,
+                         PolicyKeys.KW_SCHEMA_MAXCONNPERHOST,
+                         PolicyKeys.KW_SCHEMA_MAXCONNPERUSER
                          ]:
                 if not self.validateNumber(val, 0, 65535, cerror):
                     msg = ("Application '%s' option '%s' has error '%s'." % 
@@ -527,17 +529,116 @@ class PolicyCompiler():
                     return False
         return True
 
+class PolicyConnStatsPerApp():
+    """
+    Track policy user/host connection statistics for one app
+    connections : 5
+    max_t : 20
+    max_u : 5
+    max_h : 10
+    host_aggr : { 'host1' : [conn1, conn2, conn3],
+                  'host2' : [conn4, conn5] }
+    user_aggr : { 'user1' : [conn1, conn2, conn3],
+                  'user2' : [conn4, conn5] }
+    """
+    def __init__(self, maxconn, maxconnperuser, maxconnperhost):
+        """
+        The object is constructed with the policy limits
+        for total, total per user, and total per host counts.
+        As connections are allowed they are tracked in 
+        aggregation maps.
+        """
+        self.connections = 0
+        if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0:
+            raise PolicyError("PolicyConnStatsPerApp settings must be >= 0")
+        self.max_t = maxconn
+        self.max_u = maxconnperuser
+        self.max_h = maxconnperhost
+        self.user_aggr = {}
+        self.host_aggr = {}
+
+    def __str__(self):
+        pdb.set_trace()
+        res = ("Connection Limits: Total: %s, Per User: %s, Per Host: %s\n" %
+            (self.max_t, self.max_u, self.max_h))
+        res += ("User counts: %s\n" % self.user_aggr)
+        res += ("Host counts: %s" % self.host_aggr)
+        return res
+
+    def __repr__(self):
+        return self.__str__()
+
+    def update(self, maxconn, maxconnperuser, maxconnperhost):
+        if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0:
+            raise PolicyError("PolicyConnStatsPerApp settings must be >= 0")
+        self.max_t = maxconn
+        self.max_u = maxconnperuser
+        self.max_h = maxconnperhost
+
+    def can_connect(self, conn_id, user, host, diags):
+        """
+        Register a connection attempt.
+        If all the connection rules pass then add the
+        user/host to the connection tables
+        """
+        n_user = 0
+        if user in self.user_aggr:
+            n_user = len(self.user_aggr[user])
+        n_host = 0
+        if host in self.host_aggr:
+            n_host = len(self.host_aggr[host])
+
+        allowbytotal = self.max_t == 0 or self.connections < self.max_t
+        allowbyuser  = self.max_u == 0 or n_user < self.max_u
+        allowbyhost  = self.max_h == 0 or n_host < self.max_h
+
+        if allowbytotal and allowbyuser and allowbyhost:
+            self.connections += 1
+            if not user in self.user_aggr:
+                self.user_aggr[user] = []
+            self.user_aggr[user].append(conn_id)
+            if not host in self.host_aggr:
+                self.host_aggr[host] = []
+            self.host_aggr[host].append(conn_id)
+            return True
+        else:
+            if not allowbytotal:
+                diags.append("LogMe: INFO user '%s' from host '%s' denied 
connection by total connection limit" %
+                             (user, host))
+            if not allowbyuser:
+                diags.append("LogMe: INFO user '%s' from host '%s' denied 
connection by per user limit" %
+                             (user, host))
+            if not allowbyhost:
+                diags.append("LogMe: INFO user '%s' from host '%s' denied 
connection by per host limit" %
+                             (user, host))
+            return False
+
+    def disconnect(self, conn_id, user, host):
+        """
+        Unregister a connection
+        """
+        assert(self.connections > 0)
+        assert(user in self.user_aggr)
+        assert(conn_id in self.user_aggr[user])
+        assert(host in self.host_aggr)
+        assert(conn_id in self.host_aggr[host])
+        self.connections -= 1
+        self.user_aggr[user].remove(conn_id)
+        self.host_aggr[host].remove(conn_id)
 
 class Policy():
     """
     The policy database.
     """
+
     def __init__(self, folder="", schema_version=1):
         """
         Create instance
         @params folder: relative path from __file__ to conf file folder
         """
         self.data = {}
+        self.lookup_cache = {}
+        self.stats = {}
         self.folder = folder
         self.schema_version = schema_version
         self.policy_compiler = PolicyCompiler(schema_version)
@@ -586,7 +687,7 @@ class Policy():
                        (fn, policy, warnings))
             newpolicies[policy] = candidate
         # Log a warning if policy from one config file replaces another.
-        # TODO: Should this throw?
+        # TODO: Should this throw? Do we increment the policy version per load?
         for c in newpolicies:
             c_ver = 0
             e_ver = 0
@@ -606,6 +707,21 @@ class Policy():
                 msg = ("LogMe: WARNING Policy file '%s' application '%s' 
policy version '%s' %s existing policy version '%s'." %
                     (fn, c, c_ver, kw, e_ver))
                 print msg
+        for c in newpolicies:
+            c_pol = newpolicies[c]
+            c_max = 0
+            c_max_u = 0
+            c_max_h = 0
+            if PolicyKeys.KW_SCHEMA_MAXCONN in c_pol:
+                c_max = c_pol[PolicyKeys.KW_SCHEMA_MAXCONN]
+            if PolicyKeys.KW_SCHEMA_MAXCONNPERUSER in c_pol:
+                c_max_u = c_pol[PolicyKeys.KW_SCHEMA_MAXCONNPERUSER]
+            if PolicyKeys.KW_SCHEMA_MAXCONNPERHOST in c_pol:
+                c_max_h = c_pol[PolicyKeys.KW_SCHEMA_MAXCONNPERHOST]
+            if c in self.stats:
+                self.stats[c].update(c_max, c_max_u, c_max_h)
+            else:
+                self.stats[c] = PolicyConnStatsPerApp(c_max, c_max_u, c_max_h)
         self.data.update(newpolicies)
 
 
@@ -628,6 +744,7 @@ class Policy():
             print ("LogMe: Application '%s' has warnings: %s" %
                    (name, warnings))
         self.data[name] = candidate
+        # TODO: Create stats
 
     def policy_read(self, name):
         """
@@ -764,19 +881,25 @@ class Policy():
                 # no policy for this role
                 pass
 
-    def policy_lookup(self, user, host, app, upolicy):
+    def policy_lookup_settings(self, user, host, app, upolicy):
         """
-        Determine if a user on host accessing app is allowed.
+        Determine if a user on host accessing app through AMQP Open is allowed
+        according to the policy access rules. 
+        If allowed then return the policy settings.
         @param[in] user connection authId
         @param[in] host connection remote host numeric IP address
         @param[in] app application user is accessing
         @param[out] upolicy dict holding connection and policy values
         @return if allowed by policy
-        # TODO: use lookaside list for precomputed (user, host, app) policy
         # Note: the upolicy output is a non-nested dict with settings of 
interest
         # TODO: figure out decent defaults for upolicy settings that are 
undefined
         """
         try:
+            lookup_id = user + "|" + host + "|" + app
+            if lookup_id in self.lookup_cache:
+                upolicy.update( self.lookup_cache[lookup_id] )
+                return True
+
             settings = self.data[app]
             # User allowed to connect from host?
             allowed = False
@@ -788,19 +911,16 @@ class Policy():
                     if user in settings['roles'][r]:
                         restricted = True
                         uroles.append(r)
-                        #print "XXX user %s has roles %s " % (user, uroles)
             uorigins = []
             if 'connectionPolicy' in settings:
                 for ur in uroles:
                     if ur in settings['connectionPolicy']:
                         uorigins.extend(settings['connectionPolicy'][ur])
-                        #print "XXX user %s has origins %s" % (user, uorigins)
             if 'connectionOrigins' in settings:
                 for co in settings['connectionOrigins']:
                     if co in uorigins:
                         for cohost in settings['connectionOrigins'][co]:
                             if cohost.match_bin(uhs):
-                                #print "XXX user %s passes origin test at %s" 
% (user, uhs.dump())
                                 allowed = True
                                 break
                     if allowed:
@@ -813,9 +933,6 @@ class Policy():
             # Return connection limits and aggregation of role settings
             uroles.append(user) # user roles also includes username directly
             self.policy_aggregate_limits     (upolicy, settings, 
"policyVersion")
-            self.policy_aggregate_limits     (upolicy, settings, 
"maximumConnections")
-            self.policy_aggregate_limits     (upolicy, settings, 
"maximumConnectionsPerUser")
-            self.policy_aggregate_limits     (upolicy, settings, 
"maximumConnectionsPerHost")
             self.policy_aggregate_policy_int (upolicy, settings, uroles, 
"max_frame_size")
             self.policy_aggregate_policy_int (upolicy, settings, uroles, 
"max_message_size")
             self.policy_aggregate_policy_int (upolicy, settings, uroles, 
"max_session_window")
@@ -826,12 +943,40 @@ class Policy():
             self.policy_aggregate_policy_bool(upolicy, settings, uroles, 
"allow_anonymous_sender")
             self.policy_aggregate_policy_list(upolicy, settings, uroles, 
"sources")
             self.policy_aggregate_policy_list(upolicy, settings, uroles, 
"targets")
+            c_upolicy = {}
+            c_upolicy.update(upolicy)
+            self.lookup_cache[lookup_id] = c_upolicy
             return True
         except Exception, e:
             #print str(e)
             #pdb.set_trace()
             return False
 
+    def policy_lookup(self, conn_id, user, host, app, upolicy):
+        """
+        Determine if a user on host accessing app through AMQP Open is allowed:
+        - verify to the policy access rules. 
+        - track user/host connection limits
+        If allowed then return the policy settings.
+        @param[in] conn_id unique connection identifier
+        @param[in] user connection authId
+        @param[in] host connection remote host numeric IP address
+        @param[in] app application user is accessing
+        @param[out] upolicy dict holding connection and policy values
+        @return if allowed by policy
+        # Note: the upolicy output is a non-nested dict with settings of 
interest
+        # TODO: figure out decent defaults for upolicy settings that are 
undefined
+        """
+        if not self.policy_lookup_settings(user, host, app, upolicy):
+            # TODO: print ("LogMe: connection denied by connection access 
rules")
+            return False
+        assert(app in self.stats)
+        diags = []
+        if not self.stats[app].can_connect(conn_id, user, host, diags):
+            # TODO: print ("LogMe: connection denied by connection count 
limits: %s" % diags)
+            return False
+        return True
+
 
 #
 # HACK ALERT: Temporary
@@ -872,17 +1017,23 @@ def main_except(argv):
 
     # Lookups
     upolicy = {}
-    res = policy.policy_lookup('zeke', '192.168.100.5', 'photoserver', upolicy)
+    pdb.set_trace()
+    res = policy.policy_lookup('192.168.100.5:33332', 'zeke', '192.168.100.5', 
'photoserver', upolicy)
     print "Lookup zeke from 192.168.100.5. Expect true and max_frame_size 
44444. Result is %s" % res
     print "Resulting policy is: %s" % upolicy
+    # Hit the cache
+    upolicy2 = {}
+    res2  = policy.policy_lookup('192.168.100.5:33335', 'zeke', 
'192.168.100.5', 'photoserver', upolicy2)
+    # Print the stats
+    print "policy stats: %s" % policy.stats
 
     upolicy = {}
-    res = policy.policy_lookup('ellen', '72.135.2.9', 'photoserver', upolicy)
+    res = policy.policy_lookup('72.135.2.9:33333', 'ellen', '72.135.2.9', 
'photoserver', upolicy)
     print "Lookup ellen from 72.135.2.9. Expect true and max_frame_size 
666666. Result is %s" % res
     print "Resulting policy is: %s" % upolicy
 
     upolicy = {}
-    res = policy2.policy_lookup('ellen', '72.135.2.9', 'photoserver', upolicy)
+    res = policy2.policy_lookup('72.135.2.9:33334', 'ellen', '72.135.2.9', 
'photoserver', upolicy)
     print "Lookup policy2 ellen from 72.135.2.9. Expect false. Result is %s" % 
res
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/919b481f/tests/system_tests_policy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index 1775328..da9a217 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -30,7 +30,8 @@ from proton.utils import BlockingConnection, LinkDetached
 from qpid_dispatch.management.client import Node
 from system_test import TIMEOUT
 
-from qpid_dispatch_internal.management.policy import Policy, HostAddr, 
PolicyError, HostStruct
+from qpid_dispatch_internal.management.policy import \
+    Policy, HostAddr, PolicyError, HostStruct, PolicyConnStatsPerApp
 
 class AbsoluteConnectionCountLimit(TestCase):
     """
@@ -175,11 +176,8 @@ class PolicyFile(TestCase):
     def test_policy1_test_zeke_ok(self):
         upolicy = {}
         self.assertTrue( 
-            PolicyFile.policy.policy_lookup('zeke', '192.168.100.5', 
'photoserver', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'192.168.100.5', 'photoserver', upolicy) )
         self.assertTrue(upolicy['policyVersion']             == '1')
-        self.assertTrue(upolicy['maximumConnections']        == '10')
-        self.assertTrue(upolicy['maximumConnectionsPerUser'] == '5')
-        self.assertTrue(upolicy['maximumConnectionsPerHost'] == '5')
         self.assertTrue(upolicy['max_frame_size']            == 444444)
         self.assertTrue(upolicy['max_message_size']          == 444444)
         self.assertTrue(upolicy['max_session_window']        == 444444)
@@ -196,34 +194,31 @@ class PolicyFile(TestCase):
     def test_policy1_test_zeke_bad_IP(self):
         upolicy = {}
         self.assertFalse(
-            PolicyFile.policy.policy_lookup('zeke', '10.18.0.1',    
'photoserver', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'10.18.0.1',    'photoserver', upolicy) )
         self.assertFalse(
-            PolicyFile.policy.policy_lookup('zeke', '72.135.2.9',   
'photoserver', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'72.135.2.9',   'photoserver', upolicy) )
         self.assertFalse(
-            PolicyFile.policy.policy_lookup('zeke', '127.0.0.1',    
'photoserver', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'127.0.0.1',    'photoserver', upolicy) )
 
     def test_policy1_test_zeke_bad_app(self):
         upolicy = {}
         self.assertFalse(
-            PolicyFile.policy.policy_lookup('zeke', 
'192.168.100.5','galleria', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'192.168.100.5','galleria', upolicy) )
 
     def test_policy1_test_users_same_permissions(self):
         zpolicy = {}
         self.assertTrue(
-            PolicyFile.policy.policy_lookup('zeke', '192.168.100.5', 
'photoserver', zpolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', 
'192.168.100.5', 'photoserver', zpolicy) )
         ypolicy = {}
         self.assertTrue(
-            PolicyFile.policy.policy_lookup('ynot', '10.48.255.254', 
'photoserver', ypolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33334', 'ynot', 
'10.48.255.254', 'photoserver', ypolicy) )
         self.assertTrue( self.dict_compare(zpolicy, ypolicy) )
 
     def test_policy1_superuser_aggregation(self):
         upolicy = {}
         self.assertTrue( 
-            PolicyFile.policy.policy_lookup('ellen', '72.135.2.9', 
'photoserver', upolicy) )
+            PolicyFile.policy.policy_lookup('192.168.100.5:33335', 'ellen', 
'72.135.2.9', 'photoserver', upolicy) )
         self.assertTrue(upolicy['policyVersion']             == '1')
-        self.assertTrue(upolicy['maximumConnections']        == '10')
-        self.assertTrue(upolicy['maximumConnectionsPerUser'] == '5')
-        self.assertTrue(upolicy['maximumConnectionsPerHost'] == '5')
         self.assertTrue(upolicy['max_frame_size']            == 666666)
         self.assertTrue(upolicy['max_message_size']          == 666666)
         self.assertTrue(upolicy['max_session_window']        == 666666)
@@ -238,5 +233,72 @@ class PolicyFile(TestCase):
         for s in addrs: self.assertTrue(s in upolicy['targets'])
         for s in addrs: self.assertTrue(s in upolicy['sources'])
 
+class PolicyConnStatsPerAppTests(TestCase):
+
+    def test_policy_app_conn_stats_fail_by_total(self):
+        stats = PolicyConnStatsPerApp(1, 2, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', 
'10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('by total' in diags[0])
+
+    def test_policy_app_conn_stats_fail_by_user(self):
+        stats = PolicyConnStatsPerApp(3, 1, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', 
'10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per user' in diags[0])
+
+    def test_policy_app_conn_stats_fail_by_hosts(self):
+        stats = PolicyConnStatsPerApp(3, 2, 1)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', 
'10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per host' in diags[0])
+
+    def test_policy_app_conn_stats_fail_by_user_hosts(self):
+        stats = PolicyConnStatsPerApp(3, 1, 1)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', 
'10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+        self.assertTrue(len(diags) == 2)
+        self.assertTrue('per user' in diags[0] or 'per user' in diags[1])
+        self.assertTrue('per host' in diags[0] or 'per host' in diags[1])
+
+    def test_policy_app_conn_stats_update(self):
+        stats = PolicyConnStatsPerApp(3, 1, 2)
+        diags = []
+        self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', 
'10.10.10.10', diags))
+        self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+        self.assertTrue(len(diags) == 1)
+        self.assertTrue('per user' in diags[0])
+        diags = []
+        stats.update(3, 2, 2)
+        self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', 
'10.10.10.10', diags))
+
+    def test_policy_app_conn_stats_create_bad_settings(self):
+        denied = False
+        try:
+            stats = PolicyConnStatsPerApp(-3, 1, 2)
+        except PolicyError:
+            denied = True
+        self.assertTrue(denied, "Failed to detect negative setting value.")
+
+    def test_policy_app_conn_stats_update_bad_settings(self):
+        denied = False
+        try:
+            stats = PolicyConnStatsPerApp(0, 0, 0)
+        except PolicyError:
+            denied = True
+        self.assertFalse(denied, "Should allow all zeros.")
+        try:
+            stats.update(0, -1, 0)
+        except PolicyError:
+            denied = True
+        self.assertTrue(denied, "Failed to detect negative setting value.")
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to