Repository: qpid-dispatch Updated Branches: refs/heads/crolke-DISPATCH-188-1 6a6b95176 -> 919b481f0
Add a policy stats class to track connection limits and state. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/919b481f Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/919b481f Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/919b481f Branch: refs/heads/crolke-DISPATCH-188-1 Commit: 919b481f00092b3da0747e1881832c1adf7bed49 Parents: 6a6b951 Author: Chuck Rolke <[email protected]> Authored: Mon Dec 14 17:13:17 2015 -0500 Committer: Chuck Rolke <[email protected]> Committed: Mon Dec 14 17:13:17 2015 -0500 ---------------------------------------------------------------------- .../qpid_dispatch_internal/management/policy.py | 191 +++++++++++++++++-- tests/system_tests_policy.py | 92 +++++++-- 2 files changed, 248 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/919b481f/python/qpid_dispatch_internal/management/policy.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/policy.py b/python/qpid_dispatch_internal/management/policy.py index cd4d80e..17b323a 100644 --- a/python/qpid_dispatch_internal/management/policy.py +++ b/python/qpid_dispatch_internal/management/policy.py @@ -291,7 +291,9 @@ class PolicyKeys(): # Internal policy key words KW_POLICY_VERSION = "policyVersion" KW_SCHEMA_VERSION = "schemaVersion" - + KW_SCHEMA_MAXCONN = "maximumConnections" + KW_SCHEMA_MAXCONNPERHOST = "maximumConnectionsPerHost" + KW_SCHEMA_MAXCONNPERUSER = "maximumConnectionsPerUser" # @@ -310,9 +312,9 @@ class PolicyCompiler(): 'connectionAllowUnrestricted', 'connectionOrigins', 'connectionPolicy', - 'maximumConnections', - 'maximumConnectionsPerHost', - 'maximumConnectionsPerUser', + PolicyKeys.KW_SCHEMA_MAXCONN, + PolicyKeys.KW_SCHEMA_MAXCONNPERHOST, + PolicyKeys.KW_SCHEMA_MAXCONNPERUSER, 'policies', PolicyKeys.KW_POLICY_VERSION, 'roles', @@ -488,9 +490,9 @@ class PolicyCompiler(): (name, key, cerror[0])) return False policy_out[key] = val - elif key in ['maximumConnections', - 'maximumConnectionsPerHost', - 'maximumConnectionsPerUser' + elif key in [PolicyKeys.KW_SCHEMA_MAXCONN, + PolicyKeys.KW_SCHEMA_MAXCONNPERHOST, + PolicyKeys.KW_SCHEMA_MAXCONNPERUSER ]: if not self.validateNumber(val, 0, 65535, cerror): msg = ("Application '%s' option '%s' has error '%s'." % @@ -527,17 +529,116 @@ class PolicyCompiler(): return False return True +class PolicyConnStatsPerApp(): + """ + Track policy user/host connection statistics for one app + connections : 5 + max_t : 20 + max_u : 5 + max_h : 10 + host_aggr : { 'host1' : [conn1, conn2, conn3], + 'host2' : [conn4, conn5] } + user_aggr : { 'user1' : [conn1, conn2, conn3], + 'user2' : [conn4, conn5] } + """ + def __init__(self, maxconn, maxconnperuser, maxconnperhost): + """ + The object is constructed with the policy limits + for total, total per user, and total per host counts. + As connections are allowed they are tracked in + aggregation maps. + """ + self.connections = 0 + if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0: + raise PolicyError("PolicyConnStatsPerApp settings must be >= 0") + self.max_t = maxconn + self.max_u = maxconnperuser + self.max_h = maxconnperhost + self.user_aggr = {} + self.host_aggr = {} + + def __str__(self): + pdb.set_trace() + res = ("Connection Limits: Total: %s, Per User: %s, Per Host: %s\n" % + (self.max_t, self.max_u, self.max_h)) + res += ("User counts: %s\n" % self.user_aggr) + res += ("Host counts: %s" % self.host_aggr) + return res + + def __repr__(self): + return self.__str__() + + def update(self, maxconn, maxconnperuser, maxconnperhost): + if maxconn < 0 or maxconnperuser < 0 or maxconnperhost < 0: + raise PolicyError("PolicyConnStatsPerApp settings must be >= 0") + self.max_t = maxconn + self.max_u = maxconnperuser + self.max_h = maxconnperhost + + def can_connect(self, conn_id, user, host, diags): + """ + Register a connection attempt. + If all the connection rules pass then add the + user/host to the connection tables + """ + n_user = 0 + if user in self.user_aggr: + n_user = len(self.user_aggr[user]) + n_host = 0 + if host in self.host_aggr: + n_host = len(self.host_aggr[host]) + + allowbytotal = self.max_t == 0 or self.connections < self.max_t + allowbyuser = self.max_u == 0 or n_user < self.max_u + allowbyhost = self.max_h == 0 or n_host < self.max_h + + if allowbytotal and allowbyuser and allowbyhost: + self.connections += 1 + if not user in self.user_aggr: + self.user_aggr[user] = [] + self.user_aggr[user].append(conn_id) + if not host in self.host_aggr: + self.host_aggr[host] = [] + self.host_aggr[host].append(conn_id) + return True + else: + if not allowbytotal: + diags.append("LogMe: INFO user '%s' from host '%s' denied connection by total connection limit" % + (user, host)) + if not allowbyuser: + diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per user limit" % + (user, host)) + if not allowbyhost: + diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per host limit" % + (user, host)) + return False + + def disconnect(self, conn_id, user, host): + """ + Unregister a connection + """ + assert(self.connections > 0) + assert(user in self.user_aggr) + assert(conn_id in self.user_aggr[user]) + assert(host in self.host_aggr) + assert(conn_id in self.host_aggr[host]) + self.connections -= 1 + self.user_aggr[user].remove(conn_id) + self.host_aggr[host].remove(conn_id) class Policy(): """ The policy database. """ + def __init__(self, folder="", schema_version=1): """ Create instance @params folder: relative path from __file__ to conf file folder """ self.data = {} + self.lookup_cache = {} + self.stats = {} self.folder = folder self.schema_version = schema_version self.policy_compiler = PolicyCompiler(schema_version) @@ -586,7 +687,7 @@ class Policy(): (fn, policy, warnings)) newpolicies[policy] = candidate # Log a warning if policy from one config file replaces another. - # TODO: Should this throw? + # TODO: Should this throw? Do we increment the policy version per load? for c in newpolicies: c_ver = 0 e_ver = 0 @@ -606,6 +707,21 @@ class Policy(): msg = ("LogMe: WARNING Policy file '%s' application '%s' policy version '%s' %s existing policy version '%s'." % (fn, c, c_ver, kw, e_ver)) print msg + for c in newpolicies: + c_pol = newpolicies[c] + c_max = 0 + c_max_u = 0 + c_max_h = 0 + if PolicyKeys.KW_SCHEMA_MAXCONN in c_pol: + c_max = c_pol[PolicyKeys.KW_SCHEMA_MAXCONN] + if PolicyKeys.KW_SCHEMA_MAXCONNPERUSER in c_pol: + c_max_u = c_pol[PolicyKeys.KW_SCHEMA_MAXCONNPERUSER] + if PolicyKeys.KW_SCHEMA_MAXCONNPERHOST in c_pol: + c_max_h = c_pol[PolicyKeys.KW_SCHEMA_MAXCONNPERHOST] + if c in self.stats: + self.stats[c].update(c_max, c_max_u, c_max_h) + else: + self.stats[c] = PolicyConnStatsPerApp(c_max, c_max_u, c_max_h) self.data.update(newpolicies) @@ -628,6 +744,7 @@ class Policy(): print ("LogMe: Application '%s' has warnings: %s" % (name, warnings)) self.data[name] = candidate + # TODO: Create stats def policy_read(self, name): """ @@ -764,19 +881,25 @@ class Policy(): # no policy for this role pass - def policy_lookup(self, user, host, app, upolicy): + def policy_lookup_settings(self, user, host, app, upolicy): """ - Determine if a user on host accessing app is allowed. + Determine if a user on host accessing app through AMQP Open is allowed + according to the policy access rules. + If allowed then return the policy settings. @param[in] user connection authId @param[in] host connection remote host numeric IP address @param[in] app application user is accessing @param[out] upolicy dict holding connection and policy values @return if allowed by policy - # TODO: use lookaside list for precomputed (user, host, app) policy # Note: the upolicy output is a non-nested dict with settings of interest # TODO: figure out decent defaults for upolicy settings that are undefined """ try: + lookup_id = user + "|" + host + "|" + app + if lookup_id in self.lookup_cache: + upolicy.update( self.lookup_cache[lookup_id] ) + return True + settings = self.data[app] # User allowed to connect from host? allowed = False @@ -788,19 +911,16 @@ class Policy(): if user in settings['roles'][r]: restricted = True uroles.append(r) - #print "XXX user %s has roles %s " % (user, uroles) uorigins = [] if 'connectionPolicy' in settings: for ur in uroles: if ur in settings['connectionPolicy']: uorigins.extend(settings['connectionPolicy'][ur]) - #print "XXX user %s has origins %s" % (user, uorigins) if 'connectionOrigins' in settings: for co in settings['connectionOrigins']: if co in uorigins: for cohost in settings['connectionOrigins'][co]: if cohost.match_bin(uhs): - #print "XXX user %s passes origin test at %s" % (user, uhs.dump()) allowed = True break if allowed: @@ -813,9 +933,6 @@ class Policy(): # Return connection limits and aggregation of role settings uroles.append(user) # user roles also includes username directly self.policy_aggregate_limits (upolicy, settings, "policyVersion") - self.policy_aggregate_limits (upolicy, settings, "maximumConnections") - self.policy_aggregate_limits (upolicy, settings, "maximumConnectionsPerUser") - self.policy_aggregate_limits (upolicy, settings, "maximumConnectionsPerHost") self.policy_aggregate_policy_int (upolicy, settings, uroles, "max_frame_size") self.policy_aggregate_policy_int (upolicy, settings, uroles, "max_message_size") self.policy_aggregate_policy_int (upolicy, settings, uroles, "max_session_window") @@ -826,12 +943,40 @@ class Policy(): self.policy_aggregate_policy_bool(upolicy, settings, uroles, "allow_anonymous_sender") self.policy_aggregate_policy_list(upolicy, settings, uroles, "sources") self.policy_aggregate_policy_list(upolicy, settings, uroles, "targets") + c_upolicy = {} + c_upolicy.update(upolicy) + self.lookup_cache[lookup_id] = c_upolicy return True except Exception, e: #print str(e) #pdb.set_trace() return False + def policy_lookup(self, conn_id, user, host, app, upolicy): + """ + Determine if a user on host accessing app through AMQP Open is allowed: + - verify to the policy access rules. + - track user/host connection limits + If allowed then return the policy settings. + @param[in] conn_id unique connection identifier + @param[in] user connection authId + @param[in] host connection remote host numeric IP address + @param[in] app application user is accessing + @param[out] upolicy dict holding connection and policy values + @return if allowed by policy + # Note: the upolicy output is a non-nested dict with settings of interest + # TODO: figure out decent defaults for upolicy settings that are undefined + """ + if not self.policy_lookup_settings(user, host, app, upolicy): + # TODO: print ("LogMe: connection denied by connection access rules") + return False + assert(app in self.stats) + diags = [] + if not self.stats[app].can_connect(conn_id, user, host, diags): + # TODO: print ("LogMe: connection denied by connection count limits: %s" % diags) + return False + return True + # # HACK ALERT: Temporary @@ -872,17 +1017,23 @@ def main_except(argv): # Lookups upolicy = {} - res = policy.policy_lookup('zeke', '192.168.100.5', 'photoserver', upolicy) + pdb.set_trace() + res = policy.policy_lookup('192.168.100.5:33332', 'zeke', '192.168.100.5', 'photoserver', upolicy) print "Lookup zeke from 192.168.100.5. Expect true and max_frame_size 44444. Result is %s" % res print "Resulting policy is: %s" % upolicy + # Hit the cache + upolicy2 = {} + res2 = policy.policy_lookup('192.168.100.5:33335', 'zeke', '192.168.100.5', 'photoserver', upolicy2) + # Print the stats + print "policy stats: %s" % policy.stats upolicy = {} - res = policy.policy_lookup('ellen', '72.135.2.9', 'photoserver', upolicy) + res = policy.policy_lookup('72.135.2.9:33333', 'ellen', '72.135.2.9', 'photoserver', upolicy) print "Lookup ellen from 72.135.2.9. Expect true and max_frame_size 666666. Result is %s" % res print "Resulting policy is: %s" % upolicy upolicy = {} - res = policy2.policy_lookup('ellen', '72.135.2.9', 'photoserver', upolicy) + res = policy2.policy_lookup('72.135.2.9:33334', 'ellen', '72.135.2.9', 'photoserver', upolicy) print "Lookup policy2 ellen from 72.135.2.9. Expect false. Result is %s" % res http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/919b481f/tests/system_tests_policy.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py index 1775328..da9a217 100644 --- a/tests/system_tests_policy.py +++ b/tests/system_tests_policy.py @@ -30,7 +30,8 @@ from proton.utils import BlockingConnection, LinkDetached from qpid_dispatch.management.client import Node from system_test import TIMEOUT -from qpid_dispatch_internal.management.policy import Policy, HostAddr, PolicyError, HostStruct +from qpid_dispatch_internal.management.policy import \ + Policy, HostAddr, PolicyError, HostStruct, PolicyConnStatsPerApp class AbsoluteConnectionCountLimit(TestCase): """ @@ -175,11 +176,8 @@ class PolicyFile(TestCase): def test_policy1_test_zeke_ok(self): upolicy = {} self.assertTrue( - PolicyFile.policy.policy_lookup('zeke', '192.168.100.5', 'photoserver', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '192.168.100.5', 'photoserver', upolicy) ) self.assertTrue(upolicy['policyVersion'] == '1') - self.assertTrue(upolicy['maximumConnections'] == '10') - self.assertTrue(upolicy['maximumConnectionsPerUser'] == '5') - self.assertTrue(upolicy['maximumConnectionsPerHost'] == '5') self.assertTrue(upolicy['max_frame_size'] == 444444) self.assertTrue(upolicy['max_message_size'] == 444444) self.assertTrue(upolicy['max_session_window'] == 444444) @@ -196,34 +194,31 @@ class PolicyFile(TestCase): def test_policy1_test_zeke_bad_IP(self): upolicy = {} self.assertFalse( - PolicyFile.policy.policy_lookup('zeke', '10.18.0.1', 'photoserver', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '10.18.0.1', 'photoserver', upolicy) ) self.assertFalse( - PolicyFile.policy.policy_lookup('zeke', '72.135.2.9', 'photoserver', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '72.135.2.9', 'photoserver', upolicy) ) self.assertFalse( - PolicyFile.policy.policy_lookup('zeke', '127.0.0.1', 'photoserver', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '127.0.0.1', 'photoserver', upolicy) ) def test_policy1_test_zeke_bad_app(self): upolicy = {} self.assertFalse( - PolicyFile.policy.policy_lookup('zeke', '192.168.100.5','galleria', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '192.168.100.5','galleria', upolicy) ) def test_policy1_test_users_same_permissions(self): zpolicy = {} self.assertTrue( - PolicyFile.policy.policy_lookup('zeke', '192.168.100.5', 'photoserver', zpolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33333', 'zeke', '192.168.100.5', 'photoserver', zpolicy) ) ypolicy = {} self.assertTrue( - PolicyFile.policy.policy_lookup('ynot', '10.48.255.254', 'photoserver', ypolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33334', 'ynot', '10.48.255.254', 'photoserver', ypolicy) ) self.assertTrue( self.dict_compare(zpolicy, ypolicy) ) def test_policy1_superuser_aggregation(self): upolicy = {} self.assertTrue( - PolicyFile.policy.policy_lookup('ellen', '72.135.2.9', 'photoserver', upolicy) ) + PolicyFile.policy.policy_lookup('192.168.100.5:33335', 'ellen', '72.135.2.9', 'photoserver', upolicy) ) self.assertTrue(upolicy['policyVersion'] == '1') - self.assertTrue(upolicy['maximumConnections'] == '10') - self.assertTrue(upolicy['maximumConnectionsPerUser'] == '5') - self.assertTrue(upolicy['maximumConnectionsPerHost'] == '5') self.assertTrue(upolicy['max_frame_size'] == 666666) self.assertTrue(upolicy['max_message_size'] == 666666) self.assertTrue(upolicy['max_session_window'] == 666666) @@ -238,5 +233,72 @@ class PolicyFile(TestCase): for s in addrs: self.assertTrue(s in upolicy['targets']) for s in addrs: self.assertTrue(s in upolicy['sources']) +class PolicyConnStatsPerAppTests(TestCase): + + def test_policy_app_conn_stats_fail_by_total(self): + stats = PolicyConnStatsPerApp(1, 2, 2) + diags = [] + self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags)) + self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + self.assertTrue(len(diags) == 1) + self.assertTrue('by total' in diags[0]) + + def test_policy_app_conn_stats_fail_by_user(self): + stats = PolicyConnStatsPerApp(3, 1, 2) + diags = [] + self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags)) + self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + self.assertTrue(len(diags) == 1) + self.assertTrue('per user' in diags[0]) + + def test_policy_app_conn_stats_fail_by_hosts(self): + stats = PolicyConnStatsPerApp(3, 2, 1) + diags = [] + self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags)) + self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + self.assertTrue(len(diags) == 1) + self.assertTrue('per host' in diags[0]) + + def test_policy_app_conn_stats_fail_by_user_hosts(self): + stats = PolicyConnStatsPerApp(3, 1, 1) + diags = [] + self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags)) + self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + self.assertTrue(len(diags) == 2) + self.assertTrue('per user' in diags[0] or 'per user' in diags[1]) + self.assertTrue('per host' in diags[0] or 'per host' in diags[1]) + + def test_policy_app_conn_stats_update(self): + stats = PolicyConnStatsPerApp(3, 1, 2) + diags = [] + self.assertTrue(stats.can_connect('10.10.10.10:10000', 'chuck', '10.10.10.10', diags)) + self.assertFalse(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + self.assertTrue(len(diags) == 1) + self.assertTrue('per user' in diags[0]) + diags = [] + stats.update(3, 2, 2) + self.assertTrue(stats.can_connect('10.10.10.10:10001', 'chuck', '10.10.10.10', diags)) + + def test_policy_app_conn_stats_create_bad_settings(self): + denied = False + try: + stats = PolicyConnStatsPerApp(-3, 1, 2) + except PolicyError: + denied = True + self.assertTrue(denied, "Failed to detect negative setting value.") + + def test_policy_app_conn_stats_update_bad_settings(self): + denied = False + try: + stats = PolicyConnStatsPerApp(0, 0, 0) + except PolicyError: + denied = True + self.assertFalse(denied, "Should allow all zeros.") + try: + stats.update(0, -1, 0) + except PolicyError: + denied = True + self.assertTrue(denied, "Failed to detect negative setting value.") + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
