Hi, On Fri, Oct 23, 2015 at 6:30 PM, Christian Boltz <[email protected]> wrote:
> Hello, > > this patch adds the SignalRule and SignalRuleset classes > > Those classes will be used to parse and handle signal rules. > They understand the (surprisingly complex) syntax of signal rules. > > Note that get_clean() doesn't output superfluos things, so > signal ( send ) set = ( int ), > will become > signal send set=int, > > Also add a set of tests (100% coverage :-) to make sure everything works > as expected. > > > Note: realtime signals (rtmin+0..rtmin+32) are not documented in > apparmor.d.pod yet - patches welcome! > > > [ 07-add-SignalRule-and-SignalRuleset.diff ] > > === modified file ./utils/apparmor/rule/signal.py > --- utils/apparmor/rule/signal.py 2015-10-23 01:17:21.579245521 +0200 > +++ utils/apparmor/rule/signal.py 2015-10-23 01:08:01.149132984 +0200 > @@ -0,0 +1,300 @@ > +# ---------------------------------------------------------------------- > +# Copyright (C) 2015 Christian Boltz <[email protected]> > +# > +# This program is free software; you can redistribute it and/or > +# modify it under the terms of version 2 of the GNU General Public > +# License as published by the Free Software Foundation. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# ---------------------------------------------------------------------- > + > +import re > + > +from apparmor.regex import RE_PROFILE_SIGNAL, RE_PROFILE_NAME > +from apparmor.common import AppArmorBug, AppArmorException > +from apparmor.rule import BaseRule, BaseRuleset, parse_modifiers, > quote_if_needed > + > +# setup module translations > +from apparmor.translations import init_translation > +_ = init_translation() > + > + > +access_keywords_read = ['receive', 'r', 'read'] > +access_keywords_write = ['send', 'w', 'write'] > +access_keywords_rw = ['rw', 'wr'] > +access_keywords = access_keywords_read + access_keywords_write + > access_keywords_rw > + > +signal_keywords = ['hup', 'int', 'quit', 'ill', 'trap', 'abrt', 'bus', > 'fpe', 'kill', 'usr1', > + 'segv', 'usr2', 'pipe', 'alrm', 'term', 'stkflt', > 'chld', 'cont', 'stop', > + 'stp', 'ttin', 'ttou', 'urg', 'xcpu', 'xfsz', > 'vtalrm', 'prof', 'winch', > + 'io', 'pwr', 'sys', 'emt', 'exists'] > +RE_SIGNAL_REALTIME = re.compile('^rtmin\+0*([0-9]|[12][0-9]|3[0-2])$') # > rtmin+0..rtmin+32, number may have leading zeros > I do not like this regex. Its far too complicated for when its only saying: rtmin+x such that x maybe 2digit and is in [0,32] and possibly return x. Plus Its confusing whether rtmin+032 is allowed or not (regex suggests it is). Maybe its easier(more readably) done in a function with some int() and boundary tests. + > +joint_access_keyword = '\s*(' + '|'.join(access_keywords) + ')\s*' > Better[tm] written as: joint_access_keyword = ''\s*(%s)\s*' % ('|'.join(access_keywords)) > +RE_ACCESS_KEYWORDS = ( joint_access_keyword + # one of the > access_keyword or > + '|' + # > or > + '\(' + joint_access_keyword + '(' + '(\s|,)+' + > joint_access_keyword + ')*' + '\)' # one or more signal_keyword in (...) > + ) > Thats some beast! > + > +signal_keyword = '\s*([a-z0-9+]+|"[a-z0-9+]+")\s*' # don't check against > the signal keyword list in the regex to allow a more helpful error message > +RE_SIGNAL_KEYWORDS = ( > + 'set\s*=\s*' + signal_keyword + # one of the > signal_keyword or > + '|' + # > or > + 'set\s*=\s*\(' + signal_keyword + '(' + '(\s|,)+' > + signal_keyword + ')*' + '\)' # one or more signal_keyword in (...) > + ) > + > + > +RE_SIGNAL_DETAILS = re.compile( > + '^' + > + '(\s+(?P<access>' + RE_ACCESS_KEYWORDS + '))?' + # optional access > keyword(s) > +# '(\s+(?P<signal>' + RE_SIGNAL_KEYWORDS + '(\s+' + RE_SIGNAL_KEYWORDS > + ')*' + '))*' + # optional signal set(s) > + '(?P<signal>' + '(\s+(' + RE_SIGNAL_KEYWORDS + '))+' + ')?' + # > optional signal set(s) > + '(\s+(peer=' + RE_PROFILE_NAME % 'peer' + '))?' + > + '\s*$') > + > + > +RE_FILTER_SET_1 = re.compile('set\s*=\s*\(([^)]*)\)') > +RE_FILTER_SET_2 = re.compile('set\s*=') > +RE_FILTER_PARENTHESIS = re.compile('\((.*)\)') > +RE_FILTER_QUOTES = re.compile('"([a-z0-9]+)"') # used to strip quotes > around signal keywords - don't use for peer! > + > +class SignalRule(BaseRule): > + '''Class to handle and store a single signal rule''' > + > + # Nothing external should reference this class, all external users > + # should reference the class field SignalRule.ALL > + class __SignalAll(object): > + pass > + > + ALL = __SignalAll > + > + def __init__(self, access, signal, peer, audit=False, deny=False, > allow_keyword=False, > + comment='', log_event=None): > + > + super(SignalRule, self).__init__(audit=audit, deny=deny, > + allow_keyword=allow_keyword, > + comment=comment, > + log_event=log_event) > + > + self.access, self.all_accesss, unknown_items = > check_and_split_list(access, access_keywords, SignalRule.ALL, 'SignalRule', > 'access') > all_accesss (three s's) ;-) I hope that was intentional. + if unknown_items: > + raise AppArmorException('Passed unknown access keyword to > SignalRule: %s' % ' '.join(unknown_items)) > + > + self.signal, self.all_signals, unknown_items = > check_and_split_list(signal, signal_keywords, SignalRule.ALL, 'SignalRule', > 'signal') > + if unknown_items: > + for item in unknown_items: > + if RE_SIGNAL_REALTIME.match(item): > + self.signal.add(item) > + else: > + raise AppArmorException('Passed unknown signal > keyword to SignalRule: %s' % item) > Missing _(). AppArmorExceptions are expected to have translations while for AppArmorBug we dont, right? > + > + self.peer = None > + self.all_peers = False > + if peer == SignalRule.ALL: > + self.all_peers = True > + elif type(peer) == str: > + if len(peer.strip()) == 0: > + raise AppArmorBug('Passed empty peer to SignalRule: %s' % > str(peer)) > + self.peer = peer # XXX use AARE > + else: > + raise AppArmorBug('Passed unknown object to SignalRule: %s' % > str(peer)) > + > + > + @classmethod > + def _match(cls, raw_rule): > + return RE_PROFILE_SIGNAL.search(raw_rule) > + > + @classmethod > + def _parse(cls, raw_rule): > + '''parse raw_rule and return SignalRule''' > + > + matches = cls._match(raw_rule) > + if not matches: > + raise AppArmorException(_("Invalid signal rule '%s'") % > raw_rule) > + > + audit, deny, allow_keyword, comment = parse_modifiers(matches) > + > + rule_details = '' > + if matches.group('details'): > + rule_details = matches.group('details') > + > + if rule_details: > + details = RE_SIGNAL_DETAILS.search(rule_details) > + if not details: > + raise AppArmorException(_("Invalid or unknown keywords in > 'signal %s" % rule_details)) > + > + if details.group('access'): > + access = details.group('access') > + access = ' '.join(access.split(',')) # split by ',' or > whitespace > Is it expected to split strings separated by a , or whitespace? This part will only split strings separated by comma. It can't do both(which the comment confused me to believe until I read on and saw the space split ;-) ). + if access.startswith('(') and access.endswith(')'): > + access = access[1:-1] > + access = access.split() > There's the space separated split. > + else: > + access = SignalRule.ALL > + > + if details.group('signal'): > + signal = details.group('signal') > + signal = RE_FILTER_SET_1.sub(r'\1', signal) # filter out > 'set=' > + signal = RE_FILTER_SET_2.sub('', signal) # filter out > 'set=' > + #signal = RE_FILTER_PARENTHESIS.sub(r' \1 ', signal) # > filter out '(' and ')' pairs > + signal = RE_FILTER_QUOTES.sub(r' \1 ', signal) # filter > out quote pairs > + signal = signal.replace(',', ' ').split() # split at ',' > or whitespace > + else: > + signal = SignalRule.ALL > + > + if details.group('peer'): > + peer = details.group('peer') > + else: > + peer = SignalRule.ALL > + else: > + access = SignalRule.ALL > + signal = SignalRule.ALL > + peer = SignalRule.ALL > + > + return SignalRule(access, signal, peer, > + audit=audit, deny=deny, > allow_keyword=allow_keyword, comment=comment) > + > + def get_clean(self, depth=0): > + '''return rule (in clean/default formatting)''' > + > + space = ' ' * depth > + > + if self.all_accesss: > + access = '' > + elif len(self.access) == 1: > + access = ' %s' % ' '.join(self.access) > + elif self.access: > + access = ' (%s)' % ' '.join(sorted(self.access)) > + else: > + raise AppArmorBug('Empty access in signal rule') > + > + if self.all_signals: > + signal = '' > + elif len(self.signal) == 1: > + signal = ' set=%s' % ' '.join(self.signal) > + elif self.signal: > + signal = ' set=(%s)' % ' '.join(sorted(self.signal)) > + else: > + raise AppArmorBug('Empty signal in signal rule') > + > + if self.all_peers: > + peer = '' > + elif self.peer: > + peer = ' peer=%s' % quote_if_needed(self.peer) # XXX use AARE > + else: > + raise AppArmorBug('Empty signal in signal rule') > + > + return('%s%ssignal%s%s%s,%s' % (space, self.modifiers_str(), > access, signal, peer, self.comment)) > + > + def is_covered_localvars(self, other_rule): > + '''check if other_rule is covered by this rule object''' > + > + if not other_rule.access and not other_rule.all_accesss: > + raise AppArmorBug('No access specified in other signal rule') > + > + if not other_rule.signal and not other_rule.all_signals: > + raise AppArmorBug('No signal specified in other signal rule') > + > + if not other_rule.peer and not other_rule.all_peers: # XXX use > AARE > + raise AppArmorBug('No peer specified in other signal rule') > + > + if not self.all_accesss: > + if other_rule.all_accesss: > + return False > + if other_rule.access != self.access: > + return False > + > + if not self.all_signals: > + if other_rule.all_signals: > + return False > + if other_rule.signal != self.signal: > + return False > + > + if not self.all_peers: > + if other_rule.all_peers: > + return False > + if other_rule.peer != self.peer: # XXX use AARE > + return False > + > + # still here? -> then it is covered > code seems surprised seeing one here ;-) > + return True > + > + def is_equal_localvars(self, rule_obj): > + '''compare if rule-specific variables are equal''' > + > + if not type(rule_obj) == SignalRule: > + raise AppArmorBug('Passed non-signal rule: %s' % > str(rule_obj)) > + > + if (self.access != rule_obj.access > + or self.all_accesss != rule_obj.all_accesss): > + return False > + > + if (self.signal != rule_obj.signal > + or self.all_signals != rule_obj.all_signals): > + return False > + > + if (self.peer != rule_obj.peer # XXX switch to AARE > + or self.all_peers != rule_obj.all_peers): > + return False > + > + return True > + > + def logprof_header_localvars(self): > + if self.all_accesss: > + access = _('ALL') > + else: > + access = ' '.join(sorted(self.access)) > + > + if self.all_signals: > + signal = _('ALL') > + else: > + signal = ' '.join(sorted(self.signal)) > + > + if self.all_peers: > + peer = _('ALL') > + else: > + peer = self.peer # XXX use AARE > + > + return [ > + _('Access mode'), access, > + _('Signal'), signal, > + _('Peer'), peer > + ] > + > + > +class SignalRuleset(BaseRuleset): > + '''Class to handle and store a collection of signal rules''' > + > + def get_glob(self, path_or_rule): > + '''Return the next possible glob. For signal rules, that means > removing access, signal or peer''' > + # XXX only remove one part, not all > + return 'signal,' > + > + > +def check_and_split_list(lst, allowed_keywords, all_obj, classname, > keyword_name): > + '''check if lst is all_obj or contains only items listed in > allowed_keywords''' > + > + if lst == all_obj: > + return None, True, None > + elif type(lst) == str: > + result_list = {lst} > + elif (type(lst) == list or type(lst) == tuple) and len(lst) > 0: > + result_list = set(lst) > + else: > + raise AppArmorBug('Passed unknown %(type)s object to > %(classname)s: %(unknown_object)s' % > + {'type': type(lst), 'classname': classname, > 'unknown_object': str(lst)}) > + > + unknown_items = set() > + for item in result_list: > + if not item.strip(): > + raise AppArmorBug('Passed empty %(keyword_name)s to > %(classname)s' % > + {'keyword_name': keyword_name, 'classname': > classname}) > + if item not in allowed_keywords: > + unknown_items.add(item) > + > + return result_list, False, unknown_items > + > === modified file ./utils/test/test-signal.py > --- utils/test/test-signal.py 2015-10-23 01:17:35.102452075 +0200 > +++ utils/test/test-signal.py 2015-10-23 13:29:07.054183515 +0200 > @@ -0,0 +1,576 @@ > +#!/usr/bin/env python > +# ---------------------------------------------------------------------- > +# Copyright (C) 2015 Christian Boltz <[email protected]> > +# > +# This program is free software; you can redistribute it and/or > +# modify it under the terms of version 2 of the GNU General Public > +# License as published by the Free Software Foundation. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# ---------------------------------------------------------------------- > + > +import unittest > +from collections import namedtuple > +from common_test import AATest, setup_all_loops > + > +from apparmor.rule.signal import SignalRule, SignalRuleset > +from apparmor.rule import BaseRule > +from apparmor.common import AppArmorException, AppArmorBug > +#from apparmor.logparser import ReadLog > +from apparmor.translations import init_translation > +_ = init_translation() > + > +exp = namedtuple('exp', ['audit', 'allow_keyword', 'deny', 'comment', > + 'access', 'all_accesss', 'signal', 'all_signals', 'peer', > 'all_peers']) > + > +# --- tests for single SignalRule --- # > + > +class SignalTest(AATest): > + def _compare_obj(self, obj, expected): > + self.assertEqual(expected.allow_keyword, obj.allow_keyword) > + self.assertEqual(expected.audit, obj.audit) > + self.assertEqual(expected.access, obj.access) > + self.assertEqual(expected.signal, obj.signal) > + self.assertEqual(expected.all_accesss, obj.all_accesss) > + self.assertEqual(expected.all_signals, obj.all_signals) > + self.assertEqual(expected.deny, obj.deny) > + self.assertEqual(expected.comment, obj.comment) > + > +class SignalTestParse(SignalTest): > + tests = [ > + # SignalRule object audit allow deny > comment access all? signal all? peer > all? > + ('signal,' , exp(False, False, > False, '', None , True , None, True, None, > True )), > + ('signal send,' , exp(False, False, > False, '', {'send'}, False, None, True, None, > True )), > + ('signal (send, receive),' , exp(False, False, > False, '', {'send', 'receive'}, False, None, True, None, > True )), > + ('signal send set=quit,' , exp(False, False, > False, '', {'send'}, False, {'quit'}, False, None, > True )), > + ('deny signal send set=quit, # cmt' , exp(False, False, True > , ' # cmt', {'send'}, False, {'quit'}, False, None, > True )), > + ('audit allow signal set=int,' , exp(True , True , > False, '', None , True , {'int'}, False, None, > True )), > + ('signal set=quit peer=unconfined,' , exp(False, False, > False, '', None , True , {'quit'}, False, > 'unconfined', False )), > + ('signal send set=(quit),' , exp(False, False, > False, '', {'send'}, False, {'quit'}, False, None, > True )), > + ('signal send set=(quit, int),' , exp(False, False, > False, '', {'send'}, False, {'quit', 'int'}, False, None, > True )), > + ('signal set=(quit, int),' , exp(False, False, > False, '', None, True, {'quit', 'int'}, False, None, > True )), > + ('signal send set = ( quit , int ) ,' , exp(False, False, > False, '', {'send'}, False, {'quit', 'int'}, False, None, > True )), > + ('signal peer=/foo,' , exp(False, False, > False, '', None , True , None, True, '/foo', > False )), > + ('signal r set=quit set=int peer=/foo,' , exp(False, False, > False, '', {'r'}, False, {'quit', 'int'}, False, '/foo', > False )), > + ] > + > + def _run_test(self, rawrule, expected): > + self.assertTrue(SignalRule.match(rawrule)) > + obj = SignalRule.parse(rawrule) > + self.assertEqual(rawrule.strip(), obj.raw_rule) > + self._compare_obj(obj, expected) > + > +class SignalTestParseInvalid(SignalTest): > + tests = [ > + ('signal foo,' , AppArmorException), > + ('signal foo bar,' , AppArmorException), > + ('signal foo int,' , AppArmorException), > + ('signal send bar,' , AppArmorException), > + ('signal send receive,' , AppArmorException), > + ('signal set=,' , AppArmorException), > + ('signal set=int set=,' , AppArmorException), > + ('signal set=invalid,' , AppArmorException), > + ('signal peer=,' , AppArmorException), > + ] > + > + def _run_test(self, rawrule, expected): > + self.assertTrue(SignalRule.match(rawrule)) # the above invalid > rules still match the main regex! > + with self.assertRaises(expected): > + SignalRule.parse(rawrule) > + > +#class SignalTestParseFromLog(SignalTest): > +# def test_net_from_log(self): > +# parser = ReadLog('', '', '', '', '') > +# event = 'type=AVC msg=audit(1428699242.551:386): > apparmor="DENIED" operation="create" profile="/bin/ping" pid=10589 > comm="ping" family="send" sock_type="raw" protocol=1' > + > +# parsed_event = parser.parse_event(event) > + > +# self.assertEqual(parsed_event, { > +# 'request_mask': None, > +# 'denied_mask': None, > +# 'error_code': 0, > +# 'family': 'send', > +# 'magic_token': 0, > +# 'parent': 0, > +# 'profile': '/bin/ping', > +# 'protocol': 'icmp', > +# 'sock_type': 'raw', > +# 'operation': 'create', > +# 'resource': None, > +# 'info': None, > +# 'aamode': 'REJECTING', > +# 'time': 1428699242, > +# 'active_hat': None, > +# 'pid': 10589, > +# 'task': 0, > +# 'attr': None, > +# 'name2': None, > +# 'name': None, > +# }) > + > +# obj = SignalRule(parsed_event['family'], > parsed_event['sock_type'], log_event=parsed_event) > + > +# # audit allow deny comment access > all? type/proto all? > +# expected = exp(False, False, False, '' , 'send', > False, 'raw' , False) > + > +# self._compare_obj(obj, expected) > + > +# self.assertEqual(obj.get_raw(1), ' signal send raw,') > + > + > +class SignalFromInit(SignalTest): > + tests = [ > + # SignalRule object > audit allow deny comment access all? signal > all? peer all? > + (SignalRule('r', 'hup', 'unconfined', deny=True) , > exp(False, False, True , '' , {'r'}, False, {'hup'}, > False, 'unconfined', False)), > + (SignalRule(('r', 'send'), ('hup', 'int'), '/bin/foo') , > exp(False, False, False, '' , {'r', 'send'},False, {'hup', > 'int'}, False, '/bin/foo', False)), > + (SignalRule(SignalRule.ALL, 'int', '/bin/foo') , > exp(False, False, False, '' , None, True, {'int'}, > False, '/bin/foo', False )), > + (SignalRule('rw', SignalRule.ALL, '/bin/foo') , > exp(False, False, False, '' , {'rw'}, False, None, > True, '/bin/foo', False )), > + (SignalRule('rw', ('int'), SignalRule.ALL) , > exp(False, False, False, '' , {'rw'}, False, {'int'}, > False, None, True )), > + (SignalRule(SignalRule.ALL, SignalRule.ALL, SignalRule.ALL) , > exp(False, False, False, '' , None , True, None, > True, None, True )), > + ] > + > + def _run_test(self, obj, expected): > + self._compare_obj(obj, expected) > + > + > +class InvalidSignalInit(AATest): > + tests = [ > + # init params expected exception > + (['send', '' , '/foo' ] , AppArmorBug), # empty signal > + (['' , 'int' , '/foo' ] , AppArmorBug), # empty access > + (['send', 'int' , '' ] , AppArmorBug), # empty peer > + ([' ', 'int' , '/foo' ] , AppArmorBug), # whitespace > access > + (['send', ' ' , '/foo' ] , AppArmorBug), # whitespace > signal > + (['send', 'int' , ' ' ] , AppArmorBug), # whitespace peer > + (['xyxy', 'int' , '/foo' ] , AppArmorException), # invalid > access > + (['send', 'xyxy', '/foo' ] , AppArmorException), # invalid > signal > + # XXX is 'invalid peer' possible at all? > + ([dict(), 'int' , '/foo' ] , AppArmorBug), # wrong type for > access > + ([None , 'int' , '/foo' ] , AppArmorBug), # wrong type for > access > + (['send', dict(), '/foo' ] , AppArmorBug), # wrong type for > signal > + (['send', None , '/foo' ] , AppArmorBug), # wrong type for > signal > + (['send', 'int' , dict() ] , AppArmorBug), # wrong type for > peer > + (['send', 'int' , None ] , AppArmorBug), # wrong type for > peer > + ] > + > + def _run_test(self, params, expected): > + with self.assertRaises(expected): > + SignalRule(params[0], params[1], params[2]) > + > + def test_missing_params_1(self): > + with self.assertRaises(TypeError): > + SignalRule() > + > + def test_missing_params_2(self): > + with self.assertRaises(TypeError): > + SignalRule('r') > + > + def test_missing_params_3(self): > + with self.assertRaises(TypeError): > + SignalRule('r', 'int') > + > + > +class InvalidSignalTest(AATest): > + def _check_invalid_rawrule(self, rawrule): > + obj = None > + self.assertFalse(SignalRule.match(rawrule)) > + with self.assertRaises(AppArmorException): > + obj = SignalRule(SignalRule.parse(rawrule)) > + > + self.assertIsNone(obj, 'SignalRule handed back an object > unexpectedly') > + > + def test_invalid_net_missing_comma(self): > + self._check_invalid_rawrule('signal') # missing comma > + > + def test_invalid_non_SignalRule(self): > + self._check_invalid_rawrule('dbus,') # not a signal rule > + > + def test_empty_data_1(self): > + obj = SignalRule('send', 'quit', '/foo') > + obj.access = '' > + # no access set, and ALL not set > + with self.assertRaises(AppArmorBug): > + obj.get_clean(1) > + > + def test_empty_data_2(self): > + obj = SignalRule('send', 'quit', '/foo') > + obj.signal = '' > + # no signal set, and ALL not set > + with self.assertRaises(AppArmorBug): > + obj.get_clean(1) > + > + def test_empty_data_3(self): > + obj = SignalRule('send', 'quit', '/foo') > + obj.peer = '' > + # no signal set, and ALL not set > + with self.assertRaises(AppArmorBug): > + obj.get_clean(1) > + > + > +class WriteSignalTestAATest(AATest): > + def _run_test(self, rawrule, expected): > + self.assertTrue(SignalRule.match(rawrule)) > + obj = SignalRule.parse(rawrule) > + clean = obj.get_clean() > + raw = obj.get_raw() > + > + self.assertEqual(expected.strip(), clean, 'unexpected clean rule') > + self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule') > + > + tests = [ > + # raw rule clean > rule > + (' signal , # foo ' , 'signal, > # foo'), > + (' audit signal send,' , 'audit > signal send,'), > + (' audit signal (send ),' , 'audit > signal send,'), > + (' audit signal (send , receive ),' , > 'audit signal (receive send),'), > + (' deny signal send set=quit,# foo bar' , 'deny > signal send set=quit, # foo bar'), > + (' deny signal send set=(quit), ' , 'deny > signal send set=quit,'), > + (' deny signal send set=(int , quit),' , 'deny > signal send set=(int quit),'), > + (' deny signal send set=(quit, int ),' , 'deny > signal send set=(int quit),'), > + (' deny signal send ,# foo bar' , 'deny > signal send, # foo bar'), > + (' allow signal set=int ,# foo bar' , 'allow > signal set=int, # foo bar'), > + ] > + > + def test_write_manually(self): > + obj = SignalRule('send', 'quit', '/foo', allow_keyword=True) > + > + expected = ' allow signal send set=quit peer=/foo,' > + > + self.assertEqual(expected, obj.get_clean(2), 'unexpected clean > rule') > + self.assertEqual(expected, obj.get_raw(2), 'unexpected raw rule') > + > + > +class SignalCoveredTest(AATest): > + def _run_test(self, param, expected): > + obj = SignalRule.parse(self.rule) > + check_obj = SignalRule.parse(param) > + > + self.assertTrue(SignalRule.match(param)) > + > + self.assertEqual(obj.is_equal(check_obj), expected[0], 'Mismatch > in is_equal, expected %s' % expected[0]) > + self.assertEqual(obj.is_equal(check_obj, True), expected[1], > 'Mismatch in is_equal/strict, expected %s' % expected[1]) > + > + self.assertEqual(obj.is_covered(check_obj), expected[2], > 'Mismatch in is_covered, expected %s' % expected[2]) > + self.assertEqual(obj.is_covered(check_obj, True, True), > expected[3], 'Mismatch in is_covered/exact, expected %s' % expected[3]) > + > +class SignalCoveredTest_01(SignalCoveredTest): > + rule = 'signal send,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ('signal,' , [ False , False , > False , False ]), > + ('signal send,' , [ True , True , > True , True ]), > + ('signal send peer=unconfined,' , [ False , False , > True , True ]), > + ('signal send, # comment' , [ True , False , > True , True ]), > + ('allow signal send,' , [ True , False , > True , True ]), > + ('signal send,' , [ True , False , > True , True ]), > + ('signal send set=quit,' , [ False , False , > True , True ]), > + ('signal send set=int,' , [ False , False , > True , True ]), > + ('audit signal send,' , [ False , False , > False , False ]), > + ('audit signal,' , [ False , False , > False , False ]), > + ('signal receive,' , [ False , False , > False , False ]), > + ('signal set=int,' , [ False , False , > False , False ]), > + ('audit deny signal send,' , [ False , False , > False , False ]), > + ('deny signal send,' , [ False , False , > False , False ]), > + ] > + > +class SignalCoveredTest_02(SignalCoveredTest): > + rule = 'audit signal send,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ( 'signal send,' , [ False , False , > True , False ]), > + ('audit signal send,' , [ True , True , > True , True ]), > + ( 'signal send set=quit,' , [ False , False , > True , False ]), > + ('audit signal send set=quit,' , [ False , False , > True , True ]), > + ( 'signal,' , [ False , False , > False , False ]), > + ('audit signal,' , [ False , False , > False , False ]), > + ('signal receive,' , [ False , False , > False , False ]), > + ] > + > + > +class SignalCoveredTest_03(SignalCoveredTest): > + rule = 'signal send set=quit,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ( 'signal send set=quit,' , [ True , True , > True , True ]), > + ('allow signal send set=quit,' , [ True , False , > True , True ]), > + ( 'signal send,' , [ False , False , > False , False ]), > + ( 'signal,' , [ False , False , > False , False ]), > + ( 'signal send set=int,' , [ False , False , > False , False ]), > + ('audit signal,' , [ False , False , > False , False ]), > + ('audit signal send set=quit,' , [ False , False , > False , False ]), > + ('audit signal set=quit,' , [ False , False , > False , False ]), > + ( 'signal send,' , [ False , False , > False , False ]), > + ( 'signal,' , [ False , False , > False , False ]), > + ] > + > +class SignalCoveredTest_04(SignalCoveredTest): > + rule = 'signal,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ( 'signal,' , [ True , True , > True , True ]), > + ('allow signal,' , [ True , False , > True , True ]), > + ( 'signal send,' , [ False , False , > True , True ]), > + ( 'signal w set=quit,' , [ False , False , > True , True ]), > + ( 'signal set=int,' , [ False , False , > True , True ]), > + ( 'signal send set=quit,' , [ False , False , > True , True ]), > + ('audit signal,' , [ False , False , > False , False ]), > + ('deny signal,' , [ False , False , > False , False ]), > + ] > + > +class SignalCoveredTest_05(SignalCoveredTest): > + rule = 'deny signal send,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ( 'deny signal send,' , [ True , True , > True , True ]), > + ('audit deny signal send,' , [ False , False , > False , False ]), > + ( 'signal send,' , [ False , False , > False , False ]), # XXX should covered be true here? > + ( 'deny signal receive,' , [ False , False , > False , False ]), > + ( 'deny signal,' , [ False , False , > False , False ]), > + ] > + > +class SignalCoveredTest_06(SignalCoveredTest): > + rule = 'signal send peer=unconfined,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ('signal,' , [ False , False > , False , False ]), > + ('signal send,' , [ False , False > , False , False ]), > + ('signal send peer=unconfined,' , [ True , True > , True , True ]), > + ('signal peer=unconfined,' , [ False , False > , False , False ]), > + ('signal send, # comment' , [ False , False > , False , False ]), > + ('allow signal send,' , [ False , False > , False , False ]), > + ('allow signal send peer=unconfined,' , [ True , False > , True , True ]), > + ('allow signal send peer=/foo/bar,' , [ False , False > , False , False ]), > + ('allow signal send peer=/**,' , [ False , False > , False , False ]), > + ('allow signal send peer=**,' , [ False , False > , False , False ]), > + ('signal send,' , [ False , False > , False , False ]), > + ('signal send peer=unconfined,' , [ True , False > , True , True ]), > + ('signal send set=quit,' , [ False , False > , False , False ]), > + ('signal send set=int peer=unconfined,',[ False , False > , True , True ]), > + ('audit signal send peer=unconfined,' , [ False , False > , False , False ]), > + ('audit signal,' , [ False , False > , False , False ]), > + ('signal receive,' , [ False , False > , False , False ]), > + ('signal set=int,' , [ False , False > , False , False ]), > + ('audit deny signal send,' , [ False , False > , False , False ]), > + ('deny signal send,' , [ False , False > , False , False ]), > + ] > + > +class SignalCoveredTest_07(SignalCoveredTest): > + rule = 'signal send peer=/foo/bar,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ('signal,' , [ False , False > , False , False ]), > + ('signal send,' , [ False , False > , False , False ]), > + ('signal send peer=/foo/bar,' , [ True , True > , True , True ]), > + #('signal send peer=/foo/*,' , [ False , False > , True , True ]), # XXX > + #('signal send peer=/**,' , [ False , False > , True , True ]), # XXX > + ('signal send peer=/what/*,' , [ False , False > , False , False ]), > + ('signal peer=/foo/bar,' , [ False , False > , False , False ]), > + ('signal send, # comment' , [ False , False > , False , False ]), > + ('allow signal send,' , [ False , False > , False , False ]), > + ('allow signal send peer=/foo/bar,' , [ True , False > , True , True ]), > + ('signal send,' , [ False , False > , False , False ]), > + ('signal send peer=/foo/bar,' , [ True , False > , True , True ]), > + ('signal send peer=/what/ever,' , [ False , False > , False , False ]), > + ('signal send set=quit,' , [ False , False > , False , False ]), > + ('signal send set=int peer=/foo/bar,' , [ False , False > , True , True ]), > + ('audit signal send peer=/foo/bar,' , [ False , False > , False , False ]), > + ('audit signal,' , [ False , False > , False , False ]), > + ('signal receive,' , [ False , False > , False , False ]), > + ('signal set=int,' , [ False , False > , False , False ]), > + ('audit deny signal send,' , [ False , False > , False , False ]), > + ('deny signal send,' , [ False , False > , False , False ]), > + ] > + > +class SignalCoveredTest_08(SignalCoveredTest): > + rule = 'signal send peer=**,' > + > + tests = [ > + # rule equal strict equal > covered covered exact > + ('signal,' , [ False , False > , False , False ]), > + ('signal send,' , [ False , False > , False , False ]), > + #('signal send peer=/foo/bar,' , [ False , False > , True , True ]), # XXX several AARE tests > + #('signal send peer=/foo/*,' , [ False , False > , True , True ]), > + #('signal send peer=/**,' , [ False , False > , True , True ]), > + #('signal send peer=/what/*,' , [ False , False > , True , True ]), > + ('signal peer=/foo/bar,' , [ False , False > , False , False ]), > + ('signal send, # comment' , [ False , False > , False , False ]), > + ('allow signal send,' , [ False , False > , False , False ]), > + #('allow signal send peer=/foo/bar,' , [ False , False > , True , True ]), > + ('signal send,' , [ False , False > , False , False ]), > + #('signal send peer=/foo/bar,' , [ False , False > , True , True ]), > + #('signal send peer=/what/ever,' , [ False , False > , True , True ]), > + ('signal send set=quit,' , [ False , False > , False , False ]), > + #('signal send set=int peer=/foo/bar,' , [ False , False > , True , True ]), > + ('audit signal send peer=/foo/bar,' , [ False , False > , False , False ]), > + ('audit signal,' , [ False , False > , False , False ]), > + ('signal receive,' , [ False , False > , False , False ]), > + ('signal set=int,' , [ False , False > , False , False ]), > + ('audit deny signal send,' , [ False , False > , False , False ]), > + ('deny signal send,' , [ False , False > , False , False ]), > + ] > + > + > + > + > Too much white-space > +class SignalCoveredTest_Invalid(AATest): > + def test_borked_obj_is_covered_1(self): > + obj = SignalRule.parse('signal send peer=/foo,') > + > + testobj = SignalRule('send', 'quit', '/foo') > + testobj.access = '' > + > + with self.assertRaises(AppArmorBug): > + obj.is_covered(testobj) > + > + def test_borked_obj_is_covered_2(self): > + obj = SignalRule.parse('signal send set=quit peer=/foo,') > + > + testobj = SignalRule('send', 'quit', '/foo') > + testobj.signal = '' > + > + with self.assertRaises(AppArmorBug): > + obj.is_covered(testobj) > + > + def test_borked_obj_is_covered_3(self): > + obj = SignalRule.parse('signal send set=quit peer=/foo,') > + > + testobj = SignalRule('send', 'quit', '/foo') > + testobj.peer = '' > + > + with self.assertRaises(AppArmorBug): > + obj.is_covered(testobj) > + > + def test_invalid_is_covered(self): > + obj = SignalRule.parse('signal send,') > + > + testobj = BaseRule() # different type > + > + with self.assertRaises(AppArmorBug): > + obj.is_covered(testobj) > + > + def test_invalid_is_equal(self): > + obj = SignalRule.parse('signal send,') > + > + testobj = BaseRule() # different type > + > + with self.assertRaises(AppArmorBug): > + obj.is_equal(testobj) > + > +class SignalLogprofHeaderTest(AATest): > + tests = [ > + ('signal,', [ > _('Access mode'), _('ALL'), _('Signal'), _('ALL'), _('Peer'), > _('ALL'), ]), > + ('signal send,', [ > _('Access mode'), 'send', _('Signal'), _('ALL'), _('Peer'), > _('ALL'), ]), > + ('signal send set=quit,', [ > _('Access mode'), 'send', _('Signal'), 'quit', _('Peer'), > _('ALL'), ]), > + ('deny signal,', [_('Qualifier'), 'deny', > _('Access mode'), _('ALL'), _('Signal'), _('ALL'), _('Peer'), > _('ALL'), ]), > + ('allow signal send,', [_('Qualifier'), 'allow', > _('Access mode'), 'send', _('Signal'), _('ALL'), _('Peer'), > _('ALL'), ]), > + ('audit signal send set=quit,', [_('Qualifier'), 'audit', > _('Access mode'), 'send', _('Signal'), 'quit', _('Peer'), > _('ALL'), ]), > + ('audit deny signal send,', [_('Qualifier'), 'audit deny', > _('Access mode'), 'send', _('Signal'), _('ALL'), _('Peer'), > _('ALL'), ]), > + ('signal set=(int, quit),', [ > _('Access mode'), _('ALL'), _('Signal'), 'int quit', _('Peer'), > _('ALL'), ]), > + ('signal set=( quit, int),', [ > _('Access mode'), _('ALL'), _('Signal'), 'int quit', _('Peer'), > _('ALL'), ]), > + ('signal (send, receive) set=( quit, int) peer=/foo,', [ > _('Access mode'), 'receive send', _('Signal'), 'int quit', _('Peer'), > '/foo', ]), > + ] > + > + def _run_test(self, params, expected): > + obj = SignalRule._parse(params) > + self.assertEqual(obj.logprof_header(), expected) > + > +## --- tests for SignalRuleset --- # > + > +class SignalRulesTest(AATest): > + def test_empty_ruleset(self): > + ruleset = SignalRuleset() > + ruleset_2 = SignalRuleset() > + self.assertEqual([], ruleset.get_raw(2)) > + self.assertEqual([], ruleset.get_clean(2)) > + self.assertEqual([], ruleset_2.get_raw(2)) > + self.assertEqual([], ruleset_2.get_clean(2)) > + > + def test_ruleset_1(self): > + ruleset = SignalRuleset() > + rules = [ > + 'signal set=int,', > + 'signal send,', > + ] > + > + expected_raw = [ > + 'signal set=int,', > + 'signal send,', > + '', > + ] > + > + expected_clean = [ > + 'signal send,', > + 'signal set=int,', > + '', > + ] > + > + for rule in rules: > + ruleset.add(SignalRule.parse(rule)) > + > + self.assertEqual(expected_raw, ruleset.get_raw()) > + self.assertEqual(expected_clean, ruleset.get_clean()) > + > + def test_ruleset_2(self): > + ruleset = SignalRuleset() > + rules = [ > + 'signal send set=int,', > + 'allow signal send,', > + 'deny signal set=quit, # example comment', > + ] > + > + expected_raw = [ > + ' signal send set=int,', > + ' allow signal send,', > + ' deny signal set=quit, # example comment', > + '', > + ] > + > + expected_clean = [ > + ' deny signal set=quit, # example comment', > + '', > + ' allow signal send,', > + ' signal send set=int,', > + '', > + ] > + > + for rule in rules: > + ruleset.add(SignalRule.parse(rule)) > + > + self.assertEqual(expected_raw, ruleset.get_raw(1)) > + self.assertEqual(expected_clean, ruleset.get_clean(1)) > + > + > +class SignalGlobTestAATest(AATest): > + def setUp(self): > + self.maxDiff = None > + self.ruleset = SignalRuleset() > + > + def test_glob_1(self): > + self.assertEqual(self.ruleset.get_glob('signal send,'), 'signal,') > + > + # not supported or used yet > + # def test_glob_2(self): > + # self.assertEqual(self.ruleset.get_glob('signal send raw,'), > 'signal send,') > + > + def test_glob_ext(self): > + with self.assertRaises(AppArmorBug): > + # get_glob_ext is not available for signal rules > + self.ruleset.get_glob_ext('signal send set=int,') > + > +#class SignalDeleteTestAATest(AATest): > +# pass > + > +setup_all_loops(__name__) > +if __name__ == '__main__': > + unittest.main(verbosity=2) > > > The tests mostly look nice and comprehensive(I've superficially looked at them). Yay for the awesome coverage! Looks good. I can understand the patch being pending for so long due to its sheer size. I'll checkout the follow-up patches individually. Sorry for the spam and trouble. > > Regards, > > Christian Boltz > -- > Meine Katze hat zu der Maus auch gesagt: "Kannst ganz beruhigt sein, > ich tu Dir nichts!" Und vom Fressen hat die Katze kein Ton gesagt. > [Rolf-Hubert Pobloth in suse-linux] > > > > -- > AppArmor mailing list > [email protected] > Modify settings or unsubscribe at: > https://lists.ubuntu.com/mailman/listinfo/apparmor > -- Regards, Kshitij Gupta
-- AppArmor mailing list [email protected] Modify settings or unsubscribe at: https://lists.ubuntu.com/mailman/listinfo/apparmor
