A few months ago we enabled Python optimizations in the OLPC builds causing python's "assert" statement to do nothing. Martin Langhoff pointed out that we're dropping some important parts of bitfrost's code with this change - we do want to be making these checks. Here's a patch to restore the previous behaviour, review appreciated.
diff --git a/bitfrost/__init__.py b/bitfrost/__init__.py index e69de29..d3e691f 100644 --- a/bitfrost/__init__.py +++ b/bitfrost/__init__.py @@ -0,0 +1,10 @@ +def _assert(expr): + """ + When compiled with -O, assertions are dropped. However, bitfrost uses + assertions in various places as part of the security-sensitive code flow. + All assertions have been switched to use this function so that the previous + "unoptimized" behaviour is kept. + """ + if not expr: + raise AssertionError + diff --git a/bitfrost/leases/core.py b/bitfrost/leases/core.py index e8fbbcb..3818bd6 100644 --- a/bitfrost/leases/core.py +++ b/bitfrost/leases/core.py @@ -29,6 +29,7 @@ http://wiki.laptop.org/go/Firmware_Key_and_Signature_Formats#Antitheft.2FActivat """ import bitfrost.util.json as json +from bitfrost import _assert from bitfrost.leases.crypto import verify_lease from bitfrost.leases.errors import * from bitfrost.leases.keys import LEASE_KEYS @@ -50,13 +51,13 @@ def find_lease (this_sn, this_uuid, lease_contents): # perhaps this is a singleton lease version, leases = 1, { this_sn: lease_contents } try: - assert isinstance(version, int) + _assert(isinstance(version, int)) if version != 1: # version of the multiple-lease marshalling, *not* the version # of the lease itself. raise UnsupportedLeaseFormat(version) - assert isinstance(leases, dict) - assert len(leases.keys()) > 0 + _assert(isinstance(leases, dict)) + _assert(len(leases.keys()) > 0) except: raise InvalidLeaseData() diff --git a/bitfrost/leases/crypto.py b/bitfrost/leases/crypto.py index de38f50..f28434e 100644 --- a/bitfrost/leases/crypto.py +++ b/bitfrost/leases/crypto.py @@ -26,6 +26,7 @@ for use when checking security tokens. """ import time, re, sys +from bitfrost import _assert from bitfrost.leases.errors import * import bitfrost.util.pyverify as pyverify # libtomcrypt binding @@ -45,8 +46,8 @@ def _find_matching_key(keyid, valid_keys): # key01: data\n # 3 2 2 N 1 try: - assert k[0:7] == 'key01: ' - assert k[-1] == '\n' + _assert(k[0:7] == 'key01: ') + _assert(k[-1] == '\n') key = k[7:-1].decode('hex') except: pass # our keylist has an invalid key =( @@ -77,7 +78,7 @@ def date_cmp(a, b): """ a, b = str(a), str(b) - assert len(a) == 16 and len(b) == 16 + _assert(len(a) == 16 and len(b) == 16) if a == b: return 0 # special-case infinity. if a == "00000000T000000Z": return 1 @@ -108,28 +109,28 @@ def check_expiration_func(): def _verify_sig01(certified_data, sig, valid_keys, __): """Decode and verify a signature in the sig01 format.""" try: - assert isinstance(sig, str) - assert len(sig) > (3+2+2+6+1+64+1+1) + _assert(isinstance(sig, str)) + _assert(len(sig) > (3+2+2+6+1+64+1+1)) # minus 38 - assert sig[0:7] == 'sig01: ' + _assert(sig[0:7] == 'sig01: ') hashname = sig[7:13] - assert hashname == 'sha256' - assert sig[13] == ' ' + _assert(hashname == 'sha256') + _assert(sig[13] == ' ') keyid = sig[14:78] - assert len(keyid) == 64 - assert sig[78] == ' ' - assert sig[-1] == '\n' + _assert(len(keyid) == 64) + _assert(sig[78] == ' ') + _assert(sig[-1] == '\n') sig = sig[79:-1].decode('hex') except: raise InvalidSignatureData() # find a matching key in valid_keys, which will be a parsed key list. key = _find_matching_key(keyid, valid_keys) - assert key is not None + _assert(key is not None) # Crypto check try: - assert hashname == 'sha256' + _assert(hashname == 'sha256') pyverify.verify_buffer(key, certified_data, sig) except: raise VerificationFailure() @@ -139,13 +140,13 @@ def _verify_sig01(certified_data, sig, valid_keys, __): def _verify_sig02(certified_data, sig, valid_keys, sn): """Decode and verify a signature in the sig01 format.""" try: - assert isinstance(sig, str) - assert len(sig) > (3+2+2+6+1+64+1+16+1+1) - assert sig[0:7] == 'sig02: ' - assert sig[-1] == '\n' + _assert(isinstance(sig, str)) + _assert(len(sig) > (3+2+2+6+1+64+1+16+1+1)) + _assert(sig[0:7] == 'sig02: ') + _assert(sig[-1] == '\n') sgs = sig[7:].split() - assert len(sgs) > 0 - assert (len(sgs) % 4) == 0 + _assert(len(sgs) > 0) + _assert((len(sgs) % 4) == 0) except: raise InvalidSignatureData() @@ -184,7 +185,7 @@ def _verify_sig02(certified_data, sig, valid_keys, sn): data += ('key01: %s\n' % next_key) # okay, validate signature. try: - assert hashname == 'sha256' + _assert(hashname == 'sha256') pyverify.verify_buffer(key, data, sg_sig.decode('hex')) except: raise VerificationFailure() @@ -281,10 +282,10 @@ def verify_sig(certified_data, sig, valid_keys, sn): """ try: - assert isinstance(sig, str) - assert len(sig) > 6 - assert sig[0:3] == 'sig' - assert sig[5] == ':' + _assert(isinstance(sig, str)) + _assert(len(sig) > 6) + _assert(sig[0:3] == 'sig') + _assert(sig[5] == ':') version = int(sig[3:5]) except: raise InvalidSignatureData() @@ -296,17 +297,17 @@ def verify_sig(certified_data, sig, valid_keys, sn): raise InvalidSignatureData() # unknown signature version def parse_act01(lease): - assert isinstance(lease, str) - assert len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1) - assert lease[0:7] == 'act01: ' + _assert(isinstance(lease, str)) + _assert(len(lease) > (3+2+2+11+3+16+1+ 3+2+2+1)) + _assert(lease[0:7] == 'act01: ') sn = lease[7:18] - assert lease[18] == ' ' + _assert(lease[18] == ' ') disposition = lease[19] - assert lease[20] == ' ' + _assert(lease[20] == ' ') expiration = lease[21:37] - assert lease[37:41] == ' sig' + _assert(lease[37:41] == ' sig') sig = lease[38:] - assert lease[-1] == '\n' + _assert(lease[-1] == '\n') return sn, disposition, expiration, sig def _verify_act01(sn, uuid, lease, valid_keys): @@ -327,7 +328,7 @@ def _verify_act01(sn, uuid, lease, valid_keys): # 3 2 2 11 111 16 1 3 2 2 6 1 64 1 N 1 try: lease_sn, disposition, expiration, sig = parse_act01(lease) - assert lease_sn == str(sn) + _assert(lease_sn == str(sn)) except: raise InvalidLeaseData() @@ -421,10 +422,10 @@ def verify_act(sn, uuid, lease, valid_keys): LeaseExpired """ try: - assert isinstance(lease, str) - assert len(lease) > 6 - assert lease[0:3] == 'act' - assert lease[5] == ':' + _assert(isinstance(lease, str)) + _assert(len(lease) > 6) + _assert(lease[0:3] == 'act') + _assert(lease[5] == ':') version = int(lease[3:5]) except: raise InvalidLeaseData() @@ -450,19 +451,19 @@ def _verify_dev01(sn, uuid, devkey, valid_keys): # dev01: serial-number d expiration sig01: sha256 keyid data\n # 3 2 2 11 111 16 1 3 2 2 6 1 64 1 N 1 try: - assert isinstance(devkey, str) - assert len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1) - assert devkey[0:7] == 'dev01: ' - assert devkey[7:18] == str(sn) - assert devkey[18] == ' ' + _assert(isinstance(devkey, str)) + _assert(len(devkey) > (3+2+2+11+3+16+1+ 3+2+2+1)) + _assert(devkey[0:7] == 'dev01: ') + _assert(devkey[7:18] == str(sn)) + _assert(devkey[18] == ' ') disposition = devkey[19] - assert disposition == 'A' - assert devkey[20] == ' ' + _assert(disposition == 'A') + _assert(devkey[20] == ' ') expiration = devkey[21:37] - assert expiration == '00000000T000000Z' - assert devkey[37:41] == ' sig' + _assert(expiration == '00000000T000000Z') + _assert(devkey[37:41] == ' sig') sig = devkey[38:] - assert devkey[-1] == '\n' + _assert(devkey[-1] == '\n') except: raise InvalidDevKeyData() @@ -552,11 +553,11 @@ def verify_dev(sn, uuid, devkey, valid_keys): LeaseExpired """ try: - assert isinstance(devkey, str) - assert len(devkey) > 6 - assert devkey[0:3] == 'dev' - assert devkey[5] == ':' - version = int(devkey[3:5]) + _assert(isinstance(devkey, str)) + _assert(len(devkey) > 6) + _assert(devkey[0:3] == 'dev') + _assert(devkey[5] == ':') + version = int(devkey[3:5])) except: raise InvalidDevKeyData() diff --git a/bitfrost/update/irsync.py b/bitfrost/update/irsync.py index d7628a4..102b274 100644 --- a/bitfrost/update/irsync.py +++ b/bitfrost/update/irsync.py @@ -7,6 +7,7 @@ directories which are out-of-date.""" from __future__ import with_statement import os, os.path, shutil, stat, sys from bitfrost.contents.utils import mkhashes, open_envel, UnifiedContents +from bitfrost import _assert from subprocess import check_call HASH, MODE, UID, GID, XATTR = 'h', 'm', 'u#', 'g#', 'x' @@ -25,7 +26,7 @@ def rsync (src, dst, path, recursive, verbosity=0, link_dest=None): link_path = os.path.join(link_dest, path) # rsync will complain if we give it a link-dest which doesn't actually # exist, isn't a directory, or if the link-dest is relative. - assert os.path.isabs(link_path) + _assert(os.path.isabs(link_path)) if os.path.isdir(link_path): args[2:2] = [ '--link-dest', link_path ] if verbosity > 1: args[2:2] = ['-v'] diff --git a/bitfrost/update/microformat.py b/bitfrost/update/microformat.py index b3806b2..5b4d50c 100644 --- a/bitfrost/update/microformat.py +++ b/bitfrost/update/microformat.py @@ -64,6 +64,8 @@ from __future__ import division from HTMLParser import HTMLParser, HTMLParseError import urlparse +from bitfrost import _assert + _DEBUG_PARSER = False """Check that the HTML input is well-formed. Otherwise, silently "do our best" with what we're given.""" @@ -154,9 +156,9 @@ class _UpdateHTMLParser(HTMLParser): self.in_activity -= 1 if self.in_activity == 0: if _DEBUG_PARSER: - assert self.in_activity_id == 0 - assert self.in_activity_version == 0 - assert self.in_activity_url == 0 + _assert(self.in_activity_id == 0) + _assert(self.in_activity_version == 0) + _assert(self.in_activity_url == 0) if self.last_id is not None and self.last_id.strip() == '': self.last_id = None if len(self.last_urls)>0 and (self.last_id is None or diff --git a/bitfrost/update/setup.py b/bitfrost/update/setup.py index f8f4b05..9800a14 100644 --- a/bitfrost/update/setup.py +++ b/bitfrost/update/setup.py @@ -9,6 +9,7 @@ from subprocess import call, check_call, Popen, PIPE import os, os.path, re, struct, sys, threading, time from bitfrost.leases.crypto import verify_sig, verify_dev, check_expiration_func from bitfrost.leases.keys import OS_KEYS, DEVELOPER_KEYS +from bitfrost import _assert def _is_partitioned(): # see if / is a different device from /bootpart @@ -210,38 +211,38 @@ def clean_versions(report, partitioned, cur_hash, new_hash): keep.extend(os.listdir('/versions/sticky')) except: pass # if /versions/sticky doesn't exist, don't worry about it. for f in os.listdir('/versions/pristine'): - assert len(f) > 0 and '.' not in f # defensive programming. + _assert(len(f) > 0 and '.' not in f) # defensive programming. if f in keep: continue report(0, 'Deleting old pristine version %s' % f) rmrf('/versions/pristine/%s' % f, ignore_errors=True) for f in os.listdir('/versions/run'): - assert len(f) > 0 and '.' not in f # defensive programming. + _assert(len(f) > 0 and '.' not in f) # defensive programming. if f in keep: continue report(0, 'Deleting old run version %s' % f) rmrf('/versions/run/%s' % f, ignore_errors=True) for f in os.listdir('/versions/contents'): - assert len(f) > 0 and '.' not in f # defensive programming. + _assert(len(f) > 0 and '.' not in f) # defensive programming. if f in keep: continue report(0, 'Deleting old contents for version %s' % f) rmrf('/versions/contents/%s' % f, ignore_errors=True) # 3b. Delete corresponding members of /boot-versions if using a boot partition if partitioned: for f in os.listdir('/bootpart/boot-versions'): - assert len(f) > 0 and '.' not in f + _assert(len(f) > 0 and '.' not in f) if f in keep: continue report(0, 'Deleting old boot version %s' % f) rmrf('/bootpart/boot-versions/%s' % f, ignore_errors=True) else: # 4. Clean up old configs. for f in os.listdir('/versions/configs'): - assert len(f) > 0 and not f.startswith('.') # defensive programming. + _assert(len(f) > 0 and not f.startswith('.')) # defensive programming. if f == os.path.basename(os.readlink('/versions/boot').rstrip('/')): continue report(0, 'Deleting old boot configuration %s' % f) rmrf('/versions/configs/%s' % f, ignore_errors=True) # 5. Clean up updates directory. for f in os.listdir('/versions/updates'): - assert len(f) > 0 and not f.startswith('.') # defensive programming. + _assert(len(f) > 0 and not f.startswith('.')) # defensive programming. if f.startswith(shorten_hash(new_hash)): continue report(0, 'Deleting old incomplete update %s' % f) rmrf('/versions/updates/%s' % f, ignore_errors=True) diff --git a/bitfrost/util/json.py b/bitfrost/util/json.py index 87e1065..c60c6f8 100644 --- a/bitfrost/util/json.py +++ b/bitfrost/util/json.py @@ -265,7 +265,7 @@ if __name__ == '__main__': except WriteException: pass # this is what should happen def expectWrite(obj, expect_str): - assert write(obj) == expect_str + _assert(write(obj) == expect_str) def expectRead(s, result): expectWrite(read(s), write(result)) diff --git a/bitfrost/util/urlrange.py b/bitfrost/util/urlrange.py index 9ef56ad..a5fcbf1 100755 --- a/bitfrost/util/urlrange.py +++ b/bitfrost/util/urlrange.py @@ -20,6 +20,8 @@ import urllib2 import urlparse import zipfile +from bitfrost import _assert + _cache = {} """Cache of temporary files used to download URLs.""" _redirect_cache = {} @@ -274,7 +276,7 @@ class _InnerWrapper(object): self._setup() size = self.length - self.offset self._setup() - assert self.offset <= self.length + _assert(self.offset <= self.length) if (self.offset + size) > self.length: size = self.length - self.offset # fixup overlong requests if size == 0: return [] # we're at the end of our rope. @@ -299,7 +301,7 @@ class _InnerWrapper(object): self.close() raise IOError('could not read: %s' % res.reason) data = res.read(size) - assert len(data) == size + _assert(len(data) == size) self.offset += size res.close() return data @@ -448,13 +450,13 @@ def _main(): # test redirects url = 'http://etoys.laptop.org/xo' if True: - assert urlopen(url).url == (url+'/') + _assert(urlopen(url).url == (url+'/')) # check that cached copy is stored with the canonical url saved = urlretrieve(url) print saved os.unlink(saved) - assert url in _redirect_cache - assert _redirect_cache[url] in _cache + _assert(url in _redirect_cache) + _assert(_redirect_cache[url] in _cache) # test the progress callbacks. import sys
_______________________________________________ Devel mailing list Devel@lists.laptop.org http://lists.laptop.org/listinfo/devel