Author: johannes Date: 2006-04-18 08:04:25 -0500 (Tue, 18 Apr 2006) New Revision: 8414
Modified: trunk/gnue-common/src/utils/uuid.py Log: Changed to make pylint happy with it. More pep8-ification. Modified: trunk/gnue-common/src/utils/uuid.py =================================================================== --- trunk/gnue-common/src/utils/uuid.py 2006-04-18 11:27:11 UTC (rev 8413) +++ trunk/gnue-common/src/utils/uuid.py 2006-04-18 13:04:25 UTC (rev 8414) @@ -20,6 +20,9 @@ # - Suite 330, Boston, MA 02111-1307, USA. # # $Id$ +# +# pylint: disable-msg=W0703 +# pylint: disable-msg=F0401 """ This module implements an UUID generator as described in the internet draft at 'http://www.ietf.org/internet-drafts/draft-mealling-uuid-urn-05.txt' or also @@ -40,32 +43,57 @@ from gnue.common.apps import errors -try: - if sys.platform == 'win32': - from win32com.client import GetObject - else: - import fcntl +if sys.platform == 'win32': + from win32com.client import GetObject +else: + import fcntl -except ImportError: - pass +# Namespace available if imported all (from uuid import *) +__all__ = ['InvalidVersionError', 'InvalidNamespaceError', + 'MissingNamespaceError', 'get_hardware_addresses', 'Generator', + 'UUID', 'TIME', 'MD5', 'RANDOM', 'SHA1'] + # ============================================================================= # Exceptions # ============================================================================= class InvalidVersionError(errors.SystemError): + """ + The version '<version>' is not a valid UUID version. + + Raised when L{Generator.generate}() gets called with an invalid + UUID-version. + """ def __init__(self, version): msg = u_("The version '%s' is not a valid UUID version") \ % repr(version) errors.SystemError.__init__(self, msg) -class InvalidNamespaceError (errors.SystemError): - def __init__ (self, ns): +# ============================================================================= + +class InvalidNamespaceError(errors.SystemError): + """ + 'namespace' is not recognized as valid namespace argument. + + Raised whenever an invalid namespace is given to L{Generator.set_namespace}. + A valid namespace for name-based UUID generation is made up of either 36 or + 32 characters. + """ + def __init__(self, namespace): msg = u_("'%s' is not recognized as valid namespace argument") \ - % repr(ns) + % repr(namespace) errors.SystemError.__init__(self, msg) +# ============================================================================= + class MissingNamespaceError(errors.ApplicationError): + """ + No namespace given for SHA1-/MD5-based UUIDs. + + MD5- and SHA1-based UUIDs must have a namespace defined. Use + L{set_namespace} to define this value. + """ def __init__(self): msg = u_("No namespace given for namebased UUID generation") errors.ApplicationError.__init__(self, msg) @@ -81,24 +109,22 @@ manualLock = sys.version_info[:2] < (2, 3) if os.path.exists('/dev/urandom'): - try: - dev_random = os.open('/dev/urandom', os.O_RDONLY) + dev_random = os.open('/dev/urandom', os.O_RDONLY) - except IOError: - pass - # ----------------------------------------------------------------------------- # Get a sequence of random bytes # ----------------------------------------------------------------------------- -def getRandomBytes(count): +def get_random_bytes(count): """ Return a sequence of 'count' random bytes from the best random number generator available. @param count: number of bytes to return + @type count: integer @return: sequence of 'count' random bytes + @rtype: list """ result = [] @@ -115,11 +141,15 @@ # otherwise use the random module (which is threadsafe for python 2.3+) else: - for i in xrange(count): - if manualLock: lock.acquire() - result.append (pseudoRng.randrange(0, 256)) - if manualLock: lock.release() + for dummy in xrange(count): + if manualLock: + lock.acquire() + result.append(pseudoRng.randrange(0, 256)) + + if manualLock: + lock.release() + return result @@ -127,7 +157,7 @@ # Get the hardware addresses of the installed network interfaces # ============================================================================= -class NetworkInterfaces: +def get_hardware_addresses(): """ Retrieve a list of all available network interfaces and their hardware addresses. After creating an instance of this class the attribute nics is a @@ -145,171 +175,206 @@ If no interface could be detected at all, an interface will be generated using the random number source. It will have the name 'generated'. + + @return: list of tuples (ifname, hwaddr, hwstr) as described above + @rtype: list of tuples """ - # ------------------------------------------------------------------------- - # Constructor - # ------------------------------------------------------------------------- + trys = {'linux2': [_load_from_proc, _load_via_ifconf], + 'darwin': [_load_from_plist], + 'win32' : [_load_from_winmgmts]} - def __init__(self): + nics = [] + if sys.platform in trys: + for method in trys[sys.platform]: + nics = [item for item in method() if sum(item[1])] + if nics: + break - trys = {'linux2': [self.__load_from_proc, self.__load_via_ifconf], - 'darwin': [self.__load_from_plist], - 'win32' : [self.__load_from_winmgmts]} + if not nics: + nics = [('generated', get_random_bytes(6))] - nics = [] - if sys.platform in trys: - for method in trys[sys.platform]: - nics = self.__strip_empty(method()) - if nics: - break + return [(name, hwaddr, ("%02x:" * 6)[:-1] % tuple(hwaddr)) \ + for (name, hwaddr) in nics] - if not nics: - nics = [('generated', getRandomBytes(6))] - self.nics = [(n, h, ("%02x:" * 6)[:-1] % tuple(h)) for (n, h) in nics] +# ------------------------------------------------------------------------- +# Load a list of interfaces from the proc-filesystem +# ------------------------------------------------------------------------- +def _load_from_proc(): + """ + Load a list of network-interfaces from the proc-filesystem (/proc/net/dev). + For each interface a tuple with it's name and hardware address will be + returned. The hardware address is a list of decimal numbers. - # ------------------------------------------------------------------------- - # Load a list of interfaces from the proc-filesystem - # ------------------------------------------------------------------------- + @return: list of tuples (ifname, hwaddr) + @rtype: list of tuples + """ - def __load_from_proc (self): + result = [] + try: + nfhd = open('/proc/net/dev', 'r') - result = [] try: - nfhd = open('/proc/net/dev', 'r') + for line in [raw.strip() for raw in nfhd.readlines() \ + if ':' in raw]: + result.append(line.split(':', 1)[0]) - try: - for line in [l.strip() for l in nfhd.readlines() if ':' in l]: - result.append(line.split(':', 1)[0]) + finally: + nfhd.close() - finally: - nfhd.close() + result = [(name, _hw_addr_from_socket(name)) \ + for name in result] - result = [(r, self.__hw_addr_from_socket(r)) for r in result] + except IOError: + result = [] - except: - pass + return result - return result +# ------------------------------------------------------------------------- +# Load a list of interfaces via IFCONF socket call +# ------------------------------------------------------------------------- - # ------------------------------------------------------------------------- - # Load a list of interfaces via IFCONF socket call - # ------------------------------------------------------------------------- +def _load_via_ifconf(): + """ + Load a list of network-interfaces using a SIOCGIFCONF (0x8912) socket + IO-call. For each interface a tuple with it's name and hardware address + will be returned. The hardware address is a list of decimal numbers. - def __load_via_ifconf(self): + @return: list of tuples (ifname, hwaddr) + @rtype: list of tuples + """ - SIOCGIFCONF = 0x8912 - result = [] + SIOCGIFCONF = 0x8912 + result = [] + try: + sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: - sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + dbuf = array.array('c', '\0' * 1024) + (addr, length) = dbuf.buffer_info() + ifconf = struct.pack("iP", length, addr) + data = fcntl.ioctl(sfhd.fileno(), SIOCGIFCONF, ifconf) - try: - buffer = array.array('c', '\0' * 1024) - (addr, length) = buffer.buffer_info() - ifconf = struct.pack ("iP", length, addr) - data = fcntl.ioctl(sfhd.fileno(), SIOCGIFCONF, ifconf) + size = struct.unpack("iP", data)[0] + for idx in range(0, size, 32): + ifconf = dbuf.tostring()[idx:idx+32] + name = struct.unpack("16s16s", ifconf)[0].split('\0', 1)[0] + result.append(name) - size, ptr = struct.unpack("iP", data) - for idx in range (0, size, 32): - ifconf = buffer.tostring()[idx:idx+32] - name = struct.unpack("16s16s", ifconf)[0].split('\0', 1)[0] - result.append(name) + finally: + sfhd.close() - finally: - sfhd.close() + result = [(name, _hw_addr_from_socket(name)) for name in result] - result = [(r, self.__hw_addr_from_socket(r)) for r in result] + except Exception: + result = [] - except: - pass + return result - return result +# ------------------------------------------------------------------------- +# Get a hardware address for an interface using a socket +# ------------------------------------------------------------------------- - # ------------------------------------------------------------------------- - # Get a hardware address for an interface using a socket - # ------------------------------------------------------------------------- +def _hw_addr_from_socket(iff): + """ + Get the hardware address for a given interface name using the socket + IO-call SIOCGIFHWADDR (0x8927). The hardware address will be returned as a + list of bytes. - def __hw_addr_from_socket(self, iff): + @param iff: name of the interface to get the hardware address for + @type iff: string + @return: hardware address of the requested interface + @rtype: list of 6 bytes + """ - SIOCGIFHWADDR = 0x8927 - sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - ifreq = (iff + '\0' * 32)[:32] + SIOCGIFHWADDR = 0x8927 + sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + ifreq = (iff + '\0' * 32)[:32] - try: - data = fcntl.ioctl(sfhd.fileno(), SIOCGIFHWADDR, ifreq) - result = array.array('B', data[18:24]).tolist() + try: + data = fcntl.ioctl(sfhd.fileno(), SIOCGIFHWADDR, ifreq) + result = array.array('B', data[18:24]).tolist() - finally: - sfhd.close() + finally: + sfhd.close() - return result + return result - # ------------------------------------------------------------------------- - # Get a list of interfaces using the windows management instrumentation - # ------------------------------------------------------------------------- +# ------------------------------------------------------------------------- +# Get a list of interfaces using the windows management instrumentation +# ------------------------------------------------------------------------- - def __load_from_winmgmts(self): +def _load_from_winmgmts(): + """ + Load a list of network-interfaces using the windows management + instrumentation. This assumes to have the win32 modules installed. For + each interface a tuple with it's name and hardware address will be + returned. The hardware address is a list of bytes. - result = [] - try: - cmd = "SELECT * FROM Win32_NetworkAdapterConfiguration " \ - "WHERE IPEnabled=True" - for iff in [i for i in GetObject('winmgmts:').ExecQuery(cmd)]: - parts = iff.MACAddress.split(':') - args = [16] * len(parts) - result.append((iff.Caption, map(int, parts, args))) + @return: list of tuples (ifname, hwaddr) + @rtype: list of tuples + """ - except: - pass + result = [] + try: + cmd = "SELECT * FROM Win32_NetworkAdapterConfiguration " \ + "WHERE IPEnabled=True" + for iff in [intf for intf in GetObject('winmgmts:').ExecQuery(cmd)]: + parts = iff.MACAddress.split(':') + result.append((iff.Caption, \ + [int(value, 16) for value in parts])) - return result + except Exception: + result = [] + return result - # ------------------------------------------------------------------------- - # Load the network information from the NetworkInterfaces.plist XML file - # ------------------------------------------------------------------------- - def __load_from_plist(self): +# ------------------------------------------------------------------------- +# Load the network information from the NetworkInterfaces.plist XML file +# ------------------------------------------------------------------------- - result = [] - try: - import plistlib - path = "/Library/Preferences/SystemConfiguration/" \ - "NetworkInterfaces.plist" +def _load_from_plist(): + """ + Load a list of network-interfaces from the NetworkInterfaces preference + file (/Library/Preferences/SystemConfiguration/NetworkInterfaces.plist). + For each interface a tuple with it's name and hardware address will be + returned. The hardware address is a list of bytes. - pl = plistlib.Plist.fromFile(path) - for iff in pl.Interfaces: - name = iff ['BSD Name'] - hwaddr = [ord(c) for c in iff.IOMACAddress.data] + @return: list of tuples (ifname, hwaddr) + @rtype: list of tuples + """ - # FireWire devices seem to have longer hardware addresses which - # are theirfore not usable for UUID generation. - if len(hwaddr) == 6: - result.append ((name, hwaddr)) + result = [] + try: + import plistlib + path = "/Library/Preferences/SystemConfiguration/" \ + "NetworkInterfaces.plist" - except: - pass + pref = plistlib.Plist.fromFile(path) + for iff in pref.Interfaces: + name = iff ['BSD Name'] + hwaddr = [ord(char) for char in iff.IOMACAddress.data] - return result + # FireWire devices seem to have longer hardware addresses which + # are theirfore not usable for UUID generation. + if len(hwaddr) == 6: + result.append((name, hwaddr)) + except Exception: + result = [] - # ------------------------------------------------------------------------- - # Throw away all entries having an 'empty' hardware address - # ------------------------------------------------------------------------- + return result - def __strip_empty (self, nics): - return [item for item in nics if sum (item[1])] - - # ============================================================================= # This class implements an UUID generator # ============================================================================= @@ -326,8 +391,6 @@ also described in the RPC-RFC. """ - _NIC = NetworkInterfaces () - # ------------------------------------------------------------------------- # Constructor # ------------------------------------------------------------------------- @@ -336,7 +399,7 @@ self.version = version self.__lastTime = None - self.__clockSeq = int("%02x%02x" % tuple(getRandomBytes(2)), 16) + self.__clockSeq = int("%02x%02x" % tuple(get_random_bytes(2)), 16) self.__clockField = self.__clockSeq & 0x3FFF | 0x8000 self.__namespace = None @@ -346,7 +409,8 @@ self.__timeFormat = u"%016x%04x%s" self.__randFormat = u"%02x" * 16 self.__hashFormat = u"%08x%04x%04x" + "%02x" * 8 - self.__currNode = "%02x" * 6 % tuple(self._NIC.nics[0][1]) + self.__currNode = "%02x" * 6 % tuple(get_hardware_addresses()[0][1]) + self.__uuids_per_tick = 0 # ------------------------------------------------------------------------- @@ -378,10 +442,10 @@ return self.generate_random() elif vers == MD5: - return self.generate_MD5(name, namespace) + return self.generate_md5(name, namespace) elif vers == SHA1: - return self.generate_SHA1(name, namespace) + return self.generate_sha1(name, namespace) else: raise InvalidVersionError, version @@ -402,21 +466,21 @@ timeStamp = long(time.time() * 10000000) + 122192928000000000L if timeStamp != self.__lastTime: - self.__uuidsPerTick = 0 + self.__uuids_per_tick = 0 - # If the time has been set backward, we change the clock sequence - if timeStamp < self.__lastTime: - self.__clockSeq += 1 - if self.__clockSeq > 0xFFFF: - self.__clockSeq = 0 + # If the time has been set backward, we change the clock sequence + if timeStamp < self.__lastTime: + self.__clockSeq += 1 + if self.__clockSeq > 0xFFFF: + self.__clockSeq = 0 - self.__clockField = self.__clockSeq & 0x3FFF | 0x8000 + self.__clockField = self.__clockSeq & 0x3FFF | 0x8000 - self.__lastTime = timeStamp + self.__lastTime = timeStamp else: - self.__uuidsPerTick += 1 - timeStamp += self.__uuidsPerTick + self.__uuids_per_tick += 1 + timeStamp += self.__uuids_per_tick lock.release() @@ -437,7 +501,7 @@ @return: UUID (as 32 character unicode string) """ - data = getRandomBytes(16) + data = get_random_bytes(16) # Set the two most significant bits (bits 6, 7) of the # clock_seq_hi_and_reserved to zero and one data[8] = data[8] & 0x3F | 0x80 @@ -452,7 +516,7 @@ # Generate a name-based UUID using an MD5 hash # ------------------------------------------------------------------------- - def generate_MD5(self, name, namespace=None): + def generate_md5(self, name, namespace=None): """ Generate a name based UUID using a MD5 hash. @@ -476,7 +540,7 @@ # Generate an UUID using a SHA1 hash # ------------------------------------------------------------------------- - def generate_SHA1(self, name, namespace=None): + def generate_sha1(self, name, namespace=None): """ Generate a name based UUID using a SHA1 hash. @@ -500,30 +564,31 @@ # Set the namespace to be used for name based UUIDs # ------------------------------------------------------------------------- - def set_namespace (self, newNS): + def set_namespace(self, new_ns): """ Set the namespace used for name based UUID generation. """ - if isinstance(newNS, basestring): - if len (newNS) == 36: - newNS = "".join(newNS.split('-')) + if isinstance(new_ns, basestring): + if len(new_ns) == 36: + new_ns = "".join(new_ns.split('-')) - elif len (newNS) != 32: - raise "Invalid namespace argument" + elif len(new_ns) != 32: + raise InvalidNamespaceError(new_ns) - parts = [newNS[:8], newNS[8:12], newNS[12:16], newNS[16:]] + parts = [new_ns[:8], new_ns[8:12], new_ns[12:16], new_ns[16:]] timeLow = socket.htonl(long(parts[0], 16)) timeMid = socket.htons(int(parts[1], 16)) timeHi = socket.htons(int(parts[2], 16)) - rest = [int(parts[3][i:i+2], 16) for i in range(0, 16, 2)] + rest = [int(parts[3][inx:inx+2], 16) for inx in range(0, 16, 2)] self.__namespace = struct.pack('>LHH8B', timeLow, timeMid, timeHi, - *rest) + rest[0], rest[1], rest[2],rest[3],rest[4],rest[5], + rest[6], rest[7]) else: - raise InvalidNamespaceError(newNS) + raise InvalidNamespaceError(new_ns) # Create a ready-to-use instance of the UUID generator @@ -545,8 +610,8 @@ UUID.set_namespace('6ba7b810-9dad-11d1-80b4-00c04fd430c8') print "Namespace: '6ba7b810-9dad-11d1-80b4-00c04fd430c8'" print "Encoding : 'www.gnuenterprise.org'" - print "MD5 :", repr(UUID.generate_MD5('www.gnuenterprise.org')) - print "SHA1 :", repr(UUID.generate_SHA1('www.gnuenterprise.org')) + print "MD5 :", repr(UUID.generate_md5('www.gnuenterprise.org')) + print "SHA1 :", repr(UUID.generate_sha1('www.gnuenterprise.org')) print "T:", repr(UUID.generate(version = TIME)) print "R:", repr(UUID.generate(version = RANDOM)) _______________________________________________ commit-gnue mailing list [email protected] http://lists.gnu.org/mailman/listinfo/commit-gnue
