commit: e4162f49357e04722e8db20493e15538e657c3bd Author: Devan Franchini <twitch153 <AT> gentoo <DOT> org> AuthorDate: Mon Jul 13 18:36:51 2015 +0000 Commit: Devan Franchini <twitch153 <AT> gentoo <DOT> org> CommitDate: Mon Jul 13 18:36:54 2015 +0000 URL: https://gitweb.gentoo.org/proj/layman.git/commit/?id=e4162f49
overlay.py: Adds json support for overlay definitions Also reorganizes functions alphabetically. layman/overlays/overlay.py | 658 +++++++++++++++++++++++++++------------------ 1 file changed, 403 insertions(+), 255 deletions(-) diff --git a/layman/overlays/overlay.py b/layman/overlays/overlay.py index 423b91e..de0a035 100755 --- a/layman/overlays/overlay.py +++ b/layman/overlays/overlay.py @@ -56,7 +56,7 @@ WHITESPACE_REGEX = re.compile('\s+') class Overlay(object): ''' Derive the real implementations from this.''' - def __init__(self, config, xml=None, ovl_dict=None, + def __init__(self, config, json=None, ovl_dict=None, xml=None, ignore = 0): self.config = config self.output = config['output'] @@ -69,6 +69,59 @@ class Overlay(object): self.from_xml(xml, ignore) elif ovl_dict is not None: self.from_dict(ovl_dict, ignore) + elif json is not None: + self.from_json(ovl_dict, ignore) + + + def __eq__(self, other): + for i in ('descriptions', 'homepage', 'name', 'owner_email', + 'owner_name', 'priority', 'status'): + if getattr(self, i) != getattr(other, i): + return False + for i in self.sources + other.sources: + if not i in self.sources: + return False + if not i in other.sources: + return False + return True + + + def __ne__(self, other): + return not self.__eq__(other) + + + def add(self, base): + res = 1 + first_s = True + + self.sources = self.filter_protocols(self.sources) + if not self.sources: + msg = 'Overlay.add() error: overlay "%(name)s" does not support '\ + ' the given\nprotocol(s) %(protocol)s and cannot be '\ + 'installed.'\ + % {'name': self.name, + 'protocol': str(self.config['protocol_filter'])} + self.output.error(msg) + return 1 + + for s in self.sources: + if not first_s: + self.output.info('\nTrying next source of listed sources...', 4) + try: + res = s.add(base) + if res == 0: + # Worked, throw other sources away + self.sources = [s] + break + except Exception as error: + self.output.warn(str(error), 4) + first_s = False + return res + + + def delete(self, base): + assert len(self.sources) == 1 + return self.sources[0].delete(base) def filter_protocols(self, sources): @@ -93,6 +146,226 @@ class Overlay(object): return _sources + def from_dict(self, overlay, ignore): + ''' + Process an overlay dictionary definition + ''' + msg = 'Overlay from_dict(); overlay %(ovl)s' % {'ovl': str(overlay)} + self.output.debug(msg, 6) + + _name = overlay['name'] + if _name != None: + self.name = encode(_name) + else: + msg = 'Overlay from dict(), "%(name)s" is missing a "name" entry!'\ + % {'name': self.name} + raise Exception(msg) + + _sources = overlay['source'] + + if _sources == None: + msg = 'Overlay from_dict(), "%(name)s" is missing a "source"'\ + 'entry!' % {'name': self.name} + raise Exception(msg) + + def create_dict_overlay_source(source_): + _src, _type, _sub = source_ + self.ovl_type = _type + try: + _class = self.module_controller.get_class(_type) + except InvalidModuleName: + _class = self.module_controller.get_class('stub') + + _location = encode(_src) + if _sub: + self.branch = encode(_sub) + else: + self.branch = None + + return _class(parent=self, config=self.config, + _location=_location, ignore=ignore) + + self.sources = [create_dict_overlay_source(e) for e in _sources] + + if 'owner_name' in overlay: + _owner = overlay['owner_name'] + self.owner_name = encode(_owner) + else: + self.owner_name = None + + if 'owner_email' in overlay: + _email = overlay['owner_email'] + self.owner_email = encode(_email) + else: + self.owner_email = None + msg = 'Overlay from_dict(), "%(name)s" is missing an "owner.email"'\ + ' entry!' % {'name': self.name} + if not ignore: + raise Exception(msg) + elif ignore == 1: + self.output.warn(msg, 4) + + if 'description' in overlay: + self.descriptions = [] + _descs = overlay['description'] + for d in _descs: + d = WHITESPACE_REGEX.sub(' ', d) + self.descriptions.append(encode(d)) + else: + self.descriptions = [''] + if not ignore: + raise Exception('Overlay from_dict(), "' + self.name + + '" is missing a "description" entry!') + elif ignore == 1: + self.output.warn('Overlay from_dict(), "' + self.name + + '" is missing a "description" entry!', 4) + + if 'status' in overlay: + self.status = encode(overlay['status']) + else: + self.status = None + + self.quality = 'experimental' + if 'quality' in overlay: + if overlay['quality'] in set(QUALITY_LEVELS): + self.quality = encode(overlay['quality']) + + if 'priority' in overlay: + self.priority = int(overlay['priority']) + else: + self.priority = 50 + + if 'homepage' in overlay: + self.homepage = encode(overlay['homepage']) + else: + self.homepage = None + + if 'feed' in overlay: + self.feeds = [encode(e) \ + for e in overlay['feed']] + else: + self.feeds = None + + if 'irc' in overlay: + self.irc = encode(overlay['irc']) + else: + self.irc = None + + # end of from_dict + + + def from_json(self, json, ignore): + ''' + Process a json overlay definition + ''' + msg = 'Overlay from_json(); overlay %(ovl)s' % {'ovl': str(overlay)} + self.output.debug(msg, 6) + + _name = overlay['name'] + if _name != None: + self.name = encode(_name) + else: + msg = 'Overlay from_json(), "name" entry missing from json!' + raise Exception(msg) + + _sources = overlay['source'] + + if _sources == None: + msg = 'Overlay from_json(), "%(name)s" is missing a "source"'\ + 'entry!' % {'name': self.name} + raise Exception(msg) + + def create_json_overlay_source(source_): + _src = source_['#text'] + _type = source_['@type'] + if '@branch' in source_: + _sub = source_['@branch'] + else: + _sub = '' + + self.ovl_type = _type + + try: + _class = self.module_controller.get_class(_type) + except InvalidModuleName: + _class = self.module_controller.get_class('stub') + + _location = encode(_src) + if _sub: + self.branch = encode(_sub) + else: + self.branch = None + + return _class(parent=self, config=self.config, + _location=_location, ignore=ignore) + + self.sources = [create_json_overlay_source(e) for e in _sources] + + if 'name' in overlay['owner']: + self.owner_name = encode(overlay['owner']['name']) + else: + self.owner_name = None + + if 'email' in overlay['owner']: + self.owner_email = encode(overlay['owner']['email']) + else: + self.owner_email = None + msg = 'Overlay from_json(), "%(name)s" is missing an "owner.email"'\ + 'entry!' % {'name': self.name} + if not ignore: + raise Exception(msg) + else ignore == 1: + self.output.warn(msg, 4) + + if 'description' in overlay: + self.descriptions = [] + _descs = overlay['description'] + for d in _descs: + d = WHITESPACE_REGEX.sub(' ', d['#text']) + self.descriptions.append(encode(d)) + else: + self.descriptions = [''] + if not ignore: + raise Exception('Overlay from_json(), "' + self.name + + '" is missing a "description" entry!') + elif ignore == 1: + self.output.warn('Overlay from_json(), "' + self.name + + '" is missing a "description" entry!', 4) + + if '@status' in overlay: + self.status = encode(overlay['@status']) + else: + self.status = None + + self.quality = 'experimental' + if '@quality' in overlay: + if overlay['@quality'] in set(QUALITY_LEVELS): + self.quality = encode(overlay['@quality']) + + if '@priority' in overlay: + self.priority = int(overlay['@priority']) + else: + self.priority = 50 + + if 'homepage' in overlay: + self.homepage = encode(overlay['homepage']) + else: + self.homepage = None + + if 'feed' in overlay: + self.feeds = [encode(e) \ + for e in overlay['feed']] + else: + self.feeds = None + + if 'irc' in overlay: + self.irc = encode(overlay['irc']) + else: + self.irc = None + + # end of from_json() + + def from_xml(self, xml, ignore): ''' Process an xml overlay definition @@ -228,136 +501,164 @@ class Overlay(object): self.irc = None - def from_dict(self, overlay, ignore): + def get_infostr(self): ''' - Process an overlay dictionary definition + Gives more detailed string of overlay information. + + @rtype str: encoded overlay information. ''' - msg = 'Overlay from_dict(); overlay %(ovl)s' % {'ovl': str(overlay)} - self.output.debug(msg, 6) - _name = overlay['name'] - if _name != None: - self.name = encode(_name) - else: - msg = 'Overlay from dict(), "%(name)s" is missing a "name" entry!'\ - % {'name': self.name} - raise Exception(msg) + result = '' - _sources = overlay['source'] + result += self.name + '\n' + (len(self.name) * '~') - if _sources == None: - msg = 'Overlay from_dict(), "%(name)s" is missing a "source"'\ - 'entry!' % {'name': self.name} - raise Exception(msg) + if len(self.sources) == 1: + result += '\nSource : ' + self.sources[0].src + else: + result += '\nSources:' + for i, v in enumerate(self.sources): + result += '\n %d. %s' % (i + 1, v.src) + result += '\n' - def create_dict_overlay_source(source_): - _src, _type, _sub = source_ - self.ovl_type = _type - try: - _class = self.module_controller.get_class(_type) - except InvalidModuleName: - _class = self.module_controller.get_class('stub') + if self.owner_name != None: + result += '\nContact : %s <%s>' \ + % (self.owner_name, self.owner_email) + else: + result += '\nContact : ' + self.owner_email + if len(self.sources) == 1: + result += '\nType : ' + self.sources[0].type + else: + result += '\nType : ' + '/'.join( + sorted(set(e.type for e in self.sources))) + result += '; Priority: ' + str(self.priority) + '\n' + result += 'Quality : ' + self.quality + '\n' - _location = encode(_src) - if _sub: - self.branch = encode(_sub) - else: - self.branch = None - return _class(parent=self, config=self.config, - _location=_location, ignore=ignore) + for description in self.descriptions: + description = re.compile(' +').sub(' ', description) + description = re.compile('\n ').sub('\n', description) + result += '\nDescription:' + result += '\n '.join(('\n' + description).split('\n')) + result += '\n' - self.sources = [create_dict_overlay_source(e) for e in _sources] + if self.homepage != None: + link = self.homepage + link = re.compile(' +').sub(' ', link) + link = re.compile('\n ').sub('\n', link) + result += '\nLink:' + result += '\n '.join(('\n' + link).split('\n')) + result += '\n' - if 'owner_name' in overlay: - _owner = overlay['owner_name'] - self.owner_name = encode(_owner) - else: - self.owner_name = None + if self.irc != None: + result += '\nIRC : ' + self.irc + '\n' - if 'owner_email' in overlay: - _email = overlay['owner_email'] - self.owner_email = encode(_email) - else: - self.owner_email = None - msg = 'Overlay from_dict(), "%(name)s" is missing an "owner.email"'\ - ' entry!' % {'name': self.name} - if not ignore: - raise Exception(msg) - elif ignore == 1: - self.output.warn(msg, 4) + if len(self.feeds): + result += '\n%s:' % ((len(self.feeds) == 1) and "Feed" or "Feeds") + for i in self.feeds: + result += '\n %s' % i + result += '\n' - if 'description' in overlay: - self.descriptions = [] - _descs = overlay['description'] - for d in _descs: - d = WHITESPACE_REGEX.sub(' ', d) - self.descriptions.append(encode(d)) - else: - self.descriptions = [''] - if not ignore: - raise Exception('Overlay from_dict(), "' + self.name + - '" is missing a "description" entry!') - elif ignore == 1: - self.output.warn('Overlay from_dict(), "' + self.name + - '" is missing a "description" entry!', 4) + return encoder(result, self._encoding_) - if 'status' in overlay: - self.status = encode(overlay['status']) - else: - self.status = None - self.quality = 'experimental' - if 'quality' in overlay: - if overlay['quality'] in set(QUALITY_LEVELS): - self.quality = encode(overlay['quality']) + def is_supported(self): + return any(e.is_supported() for e in self.sources) - if 'priority' in overlay: - self.priority = int(overlay['priority']) - else: - self.priority = 50 - if 'homepage' in overlay: - self.homepage = encode(overlay['homepage']) - else: - self.homepage = None + def set_priority(self, priority): + ''' + Set the priority of this overlay. + ''' + self.priority = int(priority) - if 'feed' in overlay: - self.feeds = [encode(e) \ - for e in overlay['feed']] + + def short_list(self, width = 0): + ''' + Return a shortened list of overlay information. + + @params width: int specifying terminal width. + @rtype str: string of overlay information. + ''' + if len(self.name) > 25: + name = self.name + " ###\n" + name += pad(" ", 25) else: - self.feeds = None + name = pad(self.name, 25) - if 'irc' in overlay: - self.irc = encode(overlay['irc']) + if len(set(e.type for e in self.sources)) == 1: + _type = self.sources[0].type else: - self.irc = None + _type = '%s/..' % self.sources[0].type - # end of from_dict + mtype = ' [' + pad(_type, 10) + ']' + if not width: + width = terminal_width()-1 + srclen = width - 43 + source = ', '.join(self.source_uris()) + if len(source) > srclen: + source = source.replace("overlays.gentoo.org", "o.g.o") + source = ' (' + pad(source, srclen) + ')' + return encoder(name + mtype + source, self._encoding_) - def __eq__(self, other): - for i in ('descriptions', 'homepage', 'name', 'owner_email', - 'owner_name', 'priority', 'status'): - if getattr(self, i) != getattr(other, i): - return False - for i in self.sources + other.sources: - if not i in self.sources: - return False - if not i in other.sources: - return False - return True + def source_types(self): + for i in self.sources: + yield i.type def is_official(self): + ''' + Is the overlay official? + ''' + return self.status == 'official' - def __ne__(self, other): - return not self.__eq__(other) + def source_uris(self): + for i in self.sources: + yield i.src - def set_priority(self, priority): + + def sync(self, base): + msg = 'Overlay.sync(); name = %(name)s' % {'name': self.name} + self.output.debug(msg, 4) + + assert len(self.sources) == 1 + return self.sources[0].sync(base) + + + def to_json(self): ''' - Set the priority of this overlay. + Convert to json. ''' - self.priority = int(priority) + repo = {} + + repo['@priority'] = str(self.priority) + repo['@quality'] = self.quality + if self.status != None: + repo['@status'] = self.status + repo['name'] = self.name + repo['description'] = [] + for i in self.descriptions: + repo['description'].append(i) + if self.homepage != None: + repo['homepage'] = self.homepage + if self.irc != None: + repo['irc'] = self.irc + repo['owner'] = {} + repo['owner']['email'] = self.owner_email + if self.owner_name != None: + repo['owner']['name'] = self.owner_name + repo['source'] = [] + for i in self.sources: + source = {'@type': i.__class__.type_key} + if i.branch: + source['@branch'] = i.branch + source['#text'] = i.src + repo['source'].append(source) + if self.feeds != None: + repo['feed'] = [] + for feed in self.feeds: + repo['feed'].append(feed) + + return repo def to_xml(self): @@ -415,35 +716,6 @@ class Overlay(object): return repo - def add(self, base): - res = 1 - first_s = True - - self.sources = self.filter_protocols(self.sources) - if not self.sources: - msg = 'Overlay.add() error: overlay "%(name)s" does not support '\ - ' the given\nprotocol(s) %(protocol)s and cannot be '\ - 'installed.'\ - % {'name': self.name, - 'protocol': str(self.config['protocol_filter'])} - self.output.error(msg) - return 1 - - for s in self.sources: - if not first_s: - self.output.info('\nTrying next source of listed sources...', 4) - try: - res = s.add(base) - if res == 0: - # Worked, throw other sources away - self.sources = [s] - break - except Exception as error: - self.output.warn(str(error), 4) - first_s = False - return res - - def update(self, base, available_srcs): res = 1 first_src = True @@ -488,127 +760,3 @@ class Overlay(object): self.sources[0].src = available_srcs.pop() result = True return (self.sources, result) - - - def sync(self, base): - msg = 'Overlay.sync(); name = %(name)s' % {'name': self.name} - self.output.debug(msg, 4) - - assert len(self.sources) == 1 - return self.sources[0].sync(base) - - - def delete(self, base): - assert len(self.sources) == 1 - return self.sources[0].delete(base) - - - def get_infostr(self): - ''' - Gives more detailed string of overlay information. - - @rtype str: encoded overlay information. - ''' - - result = '' - - result += self.name + '\n' + (len(self.name) * '~') - - if len(self.sources) == 1: - result += '\nSource : ' + self.sources[0].src - else: - result += '\nSources:' - for i, v in enumerate(self.sources): - result += '\n %d. %s' % (i + 1, v.src) - result += '\n' - - if self.owner_name != None: - result += '\nContact : %s <%s>' \ - % (self.owner_name, self.owner_email) - else: - result += '\nContact : ' + self.owner_email - if len(self.sources) == 1: - result += '\nType : ' + self.sources[0].type - else: - result += '\nType : ' + '/'.join( - sorted(set(e.type for e in self.sources))) - result += '; Priority: ' + str(self.priority) + '\n' - result += 'Quality : ' + self.quality + '\n' - - - for description in self.descriptions: - description = re.compile(' +').sub(' ', description) - description = re.compile('\n ').sub('\n', description) - result += '\nDescription:' - result += '\n '.join(('\n' + description).split('\n')) - result += '\n' - - if self.homepage != None: - link = self.homepage - link = re.compile(' +').sub(' ', link) - link = re.compile('\n ').sub('\n', link) - result += '\nLink:' - result += '\n '.join(('\n' + link).split('\n')) - result += '\n' - - if self.irc != None: - result += '\nIRC : ' + self.irc + '\n' - - if len(self.feeds): - result += '\n%s:' % ((len(self.feeds) == 1) and "Feed" or "Feeds") - for i in self.feeds: - result += '\n %s' % i - result += '\n' - - return encoder(result, self._encoding_) - - - def short_list(self, width = 0): - ''' - Return a shortened list of overlay information. - - @params width: int specifying terminal width. - @rtype str: string of overlay information. - ''' - if len(self.name) > 25: - name = self.name + " ###\n" - name += pad(" ", 25) - else: - name = pad(self.name, 25) - - if len(set(e.type for e in self.sources)) == 1: - _type = self.sources[0].type - else: - _type = '%s/..' % self.sources[0].type - - mtype = ' [' + pad(_type, 10) + ']' - if not width: - width = terminal_width()-1 - srclen = width - 43 - source = ', '.join(self.source_uris()) - if len(source) > srclen: - source = source.replace("overlays.gentoo.org", "o.g.o") - source = ' (' + pad(source, srclen) + ')' - - return encoder(name + mtype + source, self._encoding_) - - - def is_official(self): - ''' - Is the overlay official? - ''' - return self.status == 'official' - - - def is_supported(self): - return any(e.is_supported() for e in self.sources) - - - def source_uris(self): - for i in self.sources: - yield i.src - - - def source_types(self): - for i in self.sources: - yield i.type