Repository: qpid-dispatch Updated Branches: refs/heads/crolke-DISPATCH-188-1 1a8628a05 -> d1f764e3f
In management, tie policy ruleset to policy stats. Implement per-application connection count limit check. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d1f764e3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d1f764e3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d1f764e3 Branch: refs/heads/crolke-DISPATCH-188-1 Commit: d1f764e3f27db22d4473897fc8b20664378d6720 Parents: 1a8628a Author: Chuck Rolke <[email protected]> Authored: Mon Feb 1 13:13:34 2016 -0500 Committer: Chuck Rolke <[email protected]> Committed: Mon Feb 1 13:13:34 2016 -0500 ---------------------------------------------------------------------- .../qpid_dispatch_internal/management/agent.py | 4 + .../policy/policy_local.py | 84 +++++++++++++++++--- .../policy/policy_manager.py | 3 + .../policy/policy_util.py | 9 +-- tests/router_policy_test.py | 10 +++ 5 files changed, 94 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/management/agent.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py index d22179d..b342098 100644 --- a/python/qpid_dispatch_internal/management/agent.py +++ b/python/qpid_dispatch_internal/management/agent.py @@ -292,6 +292,10 @@ class PolicyRulesetEntity(EntityAdapter): def _identifier(self): return self.attributes.get('applicationName') +class PolicyStatsEntity(EntityAdapter): + def _identifier(self): + return self.attributes.get('applicationName') + def _addr_port_identifier(entity): for attr in ['addr', 'port']: # Set default values if need be entity.attributes.setdefault( http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_local.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index 8518381..9a3ce4d 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -22,7 +22,7 @@ """ import json -from policy_util import PolicyError, HostStruct, HostAddr +from policy_util import PolicyError, HostStruct, HostAddr, PolicyAppConnectionMgr """ Entity implementing the business logic of user connection/access policy. @@ -31,6 +31,9 @@ Entity implementing the business logic of user connection/access policy. # # class PolicyKeys(object): + """ + String constants + """ # Common key words KW_IGNORED_NAME = "name" KW_IGNORED_IDENTITY = "identity" @@ -60,6 +63,13 @@ class PolicyKeys(object): KW_SOURCES = "sources" KW_TARGETS = "targets" + # Policy stats key words + KW_CONNECTIONS_APPROVED = "connectionsApproved" + KW_CONNECTIONS_DENIED = "connectionsDenied" + KW_CONNECTIONS_CURRENT = "connectionsCurrent" + KW_PER_USER_STATE = "perUserState" + KW_PER_HOST_STATE = "perHostState" + # What settings does a user get when allowed to connect but # not restricted by a user group? KW_DEFAULT_SETTINGS = "default" @@ -72,6 +82,7 @@ class PolicyKeys(object): # user-to-group computed map in compiled ruleset RULESET_U2G_MAP = "U2G" + # # class PolicyCompiler(object): @@ -79,7 +90,8 @@ class PolicyCompiler(object): Validate incoming configuration for legal schema. - Warn about section options that go unused. - Disallow negative max connection numbers. - - Check that connectionOrigins resolve to IP hosts + - Check that connectionOrigins resolve to IP hosts. + - Enforce internal consistency, """ allowed_ruleset_options = [ @@ -352,9 +364,41 @@ class PolicyCompiler(object): return True + +# +# +class AppStats(object): + """ + Maintain live state and statistics for an application. + """ + def __init__(self, id, manager, ruleset): + self.my_id = id + self._manager = manager + self._ruleset = ruleset + self.conn_mgr = PolicyAppConnectionMgr( + ruleset[PolicyKeys.KW_MAXCONN], + ruleset[PolicyKeys.KW_MAXCONNPERHOST], + ruleset[PolicyKeys.KW_MAXCONNPERUSER]) + self._manager.get_agent().add_implementation(self, "policyStats") + + def refresh_entity(self, attributes): + """Refresh management attributes""" + attributes.update({ + PolicyKeys.KW_APPLICATION_NAME: self.my_id, + PolicyKeys.KW_CONNECTIONS_APPROVED: self.conn_mgr.connections_approved, + PolicyKeys.KW_CONNECTIONS_DENIED: self.conn_mgr.connections_denied, + PolicyKeys.KW_CONNECTIONS_CURRENT: self.conn_mgr.connections_active, + PolicyKeys.KW_PER_USER_STATE: self.conn_mgr.per_user_state, + PolicyKeys.KW_PER_HOST_STATE: self.conn_mgr.per_host_state}) + + def can_connect(self, conn_id, user, host, diags): + return self.conn_mgr.can_connect(conn_id, user, host, diags) + +# +# class PolicyLocal(object): """ - The policy database. + The local policy database. """ def __init__(self, manager): @@ -381,6 +425,13 @@ class PolicyLocal(object): # created by configuration self.settingsdb = {} + # statsdb is a map + # key : <application name> + # val : a map + # key : stat name + # val : stat value + self.statsdb = {} + # _policy_compiler is a function # validates incoming policy and readies it for internal use self._policy_compiler = PolicyCompiler() @@ -406,7 +457,7 @@ class PolicyLocal(object): self._manager.log_debug(warning) self.rulesetdb[name] = {} self.rulesetdb[name].update(candidate) - # TODO: Create stats + self.statsdb[name] = AppStats(name, self._manager, candidate) self._manager.log_info("Created policy rules for application %s" % name) def policy_read(self, name): @@ -454,22 +505,28 @@ class PolicyLocal(object): Lookup function called from C. Determine if a user on host accessing app through AMQP Open is allowed according to the policy access rules. - If allowed then return the policy settings name + If allowed then return the policy settings name. If stats.can_connect + returns true then it has registered and counted the connection. @param[in] user connection authId @param[in] host connection remote host numeric IP address as string @param[in] app application user is accessing + @param[in] conn_name connection name used for tracking reports @return settings user-group name if allowed; "" if not allowed - # Note: the upolicy[0] output is list of group names joined with '|'. - TODO: handle the AccessStats """ try: if not app in self.rulesetdb: self._manager.log_trace( - "lookup_user failed for user '%s', host '%s', application '%s': " - "No policy defined for application" % (user, host, app)) + "lookup_user failed for user '%s', host '%s', application '%s': " + "No policy defined for application" % (user, host, app)) return "" ruleset = self.rulesetdb[app] + if not app in self.statsdb: + msg = ( + "lookup_user failed for user '%s', host '%s', application '%s': " + "Policy is defined but stats are missing" % (user, host, app)) + raise PolicyError(msg) + stats = self.statsdb[app] # User in a group or default? if user in ruleset[PolicyKeys.RULESET_U2G_MAP]: usergroup = ruleset[PolicyKeys.RULESET_U2G_MAP][user] @@ -505,7 +562,14 @@ class PolicyLocal(object): return "" # This user passes administrative approval. - # TODO: Count connection limits and possibly deny + # Now check live connection counts + diags = [] + if not stats.can_connect(conn_name, user, host, diags): + for diag in diags: + self._manager.log_trace( + "lookup_user failed for user '%s', host '%s', application '%s': " + "%s" % (user, host, app, diag)) + return "" # Return success return usergroup http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_manager.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/policy/policy_manager.py b/python/qpid_dispatch_internal/policy/policy_manager.py index ce6a683..afb3c50 100644 --- a/python/qpid_dispatch_internal/policy/policy_manager.py +++ b/python/qpid_dispatch_internal/policy/policy_manager.py @@ -64,6 +64,9 @@ class PolicyManager(object): def log_error(self, text): self._log(LOG_ERROR, text) + def get_agent(self): + return self._agent + # # Management interface to create a ruleset # http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/python/qpid_dispatch_internal/policy/policy_util.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/policy/policy_util.py b/python/qpid_dispatch_internal/policy/policy_util.py index 0b1a719..e9b94d9 100644 --- a/python/qpid_dispatch_internal/policy/policy_util.py +++ b/python/qpid_dispatch_internal/policy/policy_util.py @@ -309,14 +309,11 @@ class PolicyAppConnectionMgr(object): return True else: if not allowbytotal: - diags.append("LogMe: INFO user '%s' from host '%s' denied connection by total connection limit" % - (user, host)) + diags.append("Connection denied by total connection limit") if not allowbyuser: - diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per user limit" % - (user, host)) + diags.append("Connection denied by per user limit") if not allowbyhost: - diags.append("LogMe: INFO user '%s' from host '%s' denied connection by per host limit" % - (user, host)) + diags.append("Connection denied by per host limit") self.connections_denied += 1 return False http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d1f764e3/tests/router_policy_test.py ---------------------------------------------------------------------- diff --git a/tests/router_policy_test.py b/tests/router_policy_test.py index e2b0f87..5c767c9 100644 --- a/tests/router_policy_test.py +++ b/tests/router_policy_test.py @@ -107,7 +107,14 @@ class PolicyHostAddrTest(TestCase): self.expect_deny( "::1,::2,::3", "arg count") self.expect_deny( "0:ff:0,0:fe:ffff:ffff::0", "a > b") +class MockAgent(object): + def add_implementation(self, entity, cfg_obj_name): + pass + class MockPolicyManager(object): + def __init__(self): + self.agent = MockAgent() + def log_debug(self, text): print("DEBUG: %s" % text) def log_info(self, text): @@ -117,6 +124,9 @@ class MockPolicyManager(object): def log_error(self, text): print("ERROR: %s" % text) + def get_agent(self): + return self.agent + class PolicyFile(TestCase): manager = MockPolicyManager() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
