These changes cause the hub to use generators for some potentially large queries internally. The point is to reduce memory usage.
In the case that a large query is returned via rpc, the marshaller has been extended to handle generators. The marshaller will still create a giant pile of xml, but we at least still save the memory for the original data. I also have some work on an iterating marshaller that will not keep the entire xml return in memory, but that's a bit more complicated, so I'm going to leave that for another day. There are other cases (e.g. repo_init) where we have very large queries internally. In these cases, this patch set will result in significant memory savings.
>From 5436e9a87e48ed7f0ab95ff040b1961dd6f9fd04 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Sat, 27 Aug 2011 16:32:36 -0400 Subject: [PATCH 01/11] Use iterators for potentially large queries to reduce memory use --- hub/kojihub.py | 121 ++++++++++++++++++++++++++++++++++++------------------ hub/kojixmlrpc.py | 17 ++++++++ 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index e75c2e9..a2678c7 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -1115,7 +1115,7 @@ def list_tags(build=None, package=None, queryOpts=None): query = QueryProcessor(columns=fields, aliases=aliases, tables=tables, joins=joins, clauses=clauses, values=locals(), opts=queryOpts) - return query.execute() + return query.iterate() def readTaggedBuilds(tag,event=None,inherit=False,latest=False,package=None,owner=None,type=None): """Returns a list of builds for specified tag @@ -1244,32 +1244,31 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest ('rpminfo.buildtime', 'buildtime'), ('rpminfo.buildroot_id', 'buildroot_id'), ('rpminfo.build_id', 'build_id')] - if rpmsigs: - fields.append(('rpmsigs.sigkey', 'sigkey')) - q="""SELECT %s FROM rpminfo - JOIN tag_listing ON rpminfo.build_id = tag_listing.build_id - """ % ', '.join([pair[0] for pair in fields]) + fields, aliases = zip(*fields) + tables = ['rpminfo'] + joins = ['tag_listing ON rpminfo.build_id = tag_listing.build_id'] + clauses = [eventCondition(event), 'tag_id=%(tagid)s'] + data = {} #tagid added later if package: - q += """JOIN build ON rpminfo.build_id = build.id - JOIN package ON package.id = build.pkg_id - """ + joins.append('build ON rpminfo.build_id = build.id') + joins.append('package ON package.id = build.pkg_id') + clauses.append('package.name = %(package)s') + data['package'] = package if rpmsigs: - q += """LEFT OUTER JOIN rpmsigs on rpminfo.id = rpmsigs.rpm_id - """ - q += """WHERE %s AND tag_id=%%(tagid)s - """ % eventCondition(event) - if package: - q += """AND package.name = %(package)s - """ + fields.append(('rpmsigs.sigkey', 'sigkey')) + joins.append('LEFT OUTER JOIN rpmsigs on rpminfo.id = rpmsigs.rpm_id') if arch: + data['arch'] = arch if isinstance(arch, basestring): - q += """AND rpminfo.arch = %(arch)s - """ + clauses.append('rpminfo.arch = %(arch)s') elif isinstance(arch, (list, tuple)): - q += """AND rpminfo.arch IN %(arch)s\n""" + clauses.append('rpminfo.arch IN %(arch)s') else: raise koji.GenericError, 'invalid arch option: %s' % arch + query = QueryProcessor(tables=tables, joins=joins, clauses=clauses, + columns=fields, aliases=aliases, values=data) + # unique constraints ensure that each of these queries will not report # duplicate rpminfo entries, BUT since we make the query multiple times, # we can get duplicates if a package is multiply tagged. @@ -1285,7 +1284,8 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest continue else: tags_seen[tagid] = 1 - for rpminfo in _multiRow(q, locals(), [pair[1] for pair in fields]): + query.values['tagid'] = tagid + for rpminfo in query.execute(): #note: we're checking against the build list because # it has been filtered by the package list. The tag # tools should endeavor to keep tag_listing sane w.r.t. @@ -2452,7 +2452,7 @@ def tag_changed_since_event(event,taglist): clauses = ['update_event > %(event)i', 'tag_id IN %(taglist)s'] query = QueryProcessor(tables=['tag_updates'], columns=['id'], clauses=clauses, values=data, - opts={'asList': True}) + opts={'limit': 1}) if query.execute(): return True #also check these versioned tables @@ -2470,7 +2470,7 @@ def tag_changed_since_event(event,taglist): 'tag_id IN %(taglist)s'] for table in tables: query = QueryProcessor(tables=[table], columns=['tag_id'], clauses=clauses, - values=data, opts={'asList': True}) + values=data, opts={'limit': 1}) if query.execute(): return True return False @@ -5545,7 +5545,7 @@ def query_history(tables=None, **kwargs): fields, aliases = zip(*fields.items()) query = QueryProcessor(columns=fields, aliases=aliases, tables=[table], joins=joins, clauses=clauses, values=data) - ret[table] = query.execute() + ret[table] = query.iterate() return ret @@ -5598,7 +5598,7 @@ def tag_history(build=None, tag=None, package=None, active=None, queryOpts=None) query = QueryProcessor(columns=fields, aliases=aliases, tables=tables, joins=joins, clauses=clauses, values=locals(), opts=queryOpts) - return query.execute() + return query.iterate() def untagged_builds(name=None, queryOpts=None): """Returns the list of untagged builds""" @@ -5624,7 +5624,7 @@ def untagged_builds(name=None, queryOpts=None): query = QueryProcessor(columns=fields, aliases=aliases, tables=tables, joins=joins, clauses=clauses, values=locals(), opts=queryOpts) - return query.execute() + return query.iterate() def build_map(): """Map which builds were used in the buildroots of other builds @@ -6306,6 +6306,7 @@ class QueryProcessor(object): self.tables = tables self.joins = joins self.clauses = clauses + self.cursors = 0 if values: self.values = values else: @@ -6428,6 +6429,31 @@ SELECT %(col_str)s else: return _multiRow(query, self.values, (self.aliases or self.columns)) + + def iterate(self): + if self.opts.get('countOnly'): + return self.execute() + else: + return self._iterate(str(self), self.values.copy(), self.opts.get('asList')) + + def _iterate(self, query, values, as_list=False): + cname = "qp_cursor_%s_%i" % (id(self), self.cursors) + self.cursors += 1 + query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, self) + c = context.cnx.cursor() + c.execute(query, self.values) + c.close() + query = "FETCH 1000 FROM %s" % cname + while True: + if as_list: + buf = _fetchMulti(query, {}) + else: + buf = _multiRow(query, {}, (self.aliases or self.columns)) + if not buf: + return + for row in buf: + yield row + def executeOne(self): results = self.execute() if isinstance(results, list): @@ -8090,7 +8116,7 @@ class RootExports(object): tables=tables, joins=joins, clauses=clauses, values=locals(), opts=queryOpts) - return query.execute() + return query.iterate() def getLatestBuilds(self,tag,event=None,package=None,type=None): """List latest builds for tag (inheritance enabled)""" @@ -8763,27 +8789,40 @@ class RootExports(object): query = QueryProcessor(columns=fields, aliases=aliases, tables=tables, joins=joins, clauses=conditions, values=opts, opts=queryOpts) - tasks = query.execute() + tasks = query.iterate() if queryOpts and (queryOpts.get('countOnly') or queryOpts.get('asList')): # Either of the above options makes us unable to easily the decode # the xmlrpc data return tasks - if opts.get('decode'): - for task in tasks: - # decode xmlrpc data - for f in ('request','result'): - if task[f]: - try: - if task[f].find('<?xml', 0, 10) == -1: - #handle older base64 encoded data - task[f] = base64.decodestring(task[f]) - data, method = xmlrpclib.loads(task[f]) - except xmlrpclib.Fault, fault: - data = fault - task[f] = data + if opts.get('decode') and not queryOpts.get('countOnly'): + if queryOpts.get('asList'): + keys = [] + for n, f in aliases: + if f in ('request','result'): + keys.append(n) + else: + keys = ('request','result') + tasks = self._decode_tasks(tasks, keys) + return tasks + def _decode_tasks(self, tasks, keys): + for task in tasks: + # decode xmlrpc data + for f in keys: + val = task[f] + if val: + try: + if val.find('<?xml', 0, 10) == -1: + #handle older base64 encoded data + val = base64.decodestring(val) + data, method = xmlrpclib.loads(val) + except xmlrpclib.Fault, fault: + data = fault + task[f] = data + yield task + def taskReport(self, owner=None): """Return data on active or recent tasks""" fields = ( @@ -9263,7 +9302,7 @@ class RootExports(object): aliases=aliases, tables=(table,), joins=joins, clauses=(clause,), values=locals(), opts=queryOpts) - return query.execute() + return query.iterate() class BuildRoot(object): diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py index 8b67365..26204c0 100644 --- a/hub/kojixmlrpc.py +++ b/hub/kojixmlrpc.py @@ -28,6 +28,7 @@ import traceback import types import pprint import resource +import xmlrpclib from xmlrpclib import getparser,dumps,Fault from koji.server import WSGIWrapper @@ -40,6 +41,22 @@ import koji.util from koji.context import context +# Workaround to allow xmlrpclib deal with iterators +class Marshaller(xmlrpclib.Marshaller): + + dispatch = xmlrpclib.Marshaller.dispatch.copy() + + def dump_generator(self, value, write): + dump = self.__dump + write("<value><array><data>\n") + for v in value: + dump(v, write) + write("</data></array></value>\n") + dispatch[types.GeneratorType] = dump_generator + +xmlrpclib.Marshaller = Marshaller + + class HandlerRegistry(object): """Track handlers for RPC calls""" -- 1.8.3.1
>From ac1152361ff3d2d88b888019afa9d5f1dd2bde6e Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Sat, 27 Aug 2011 21:33:46 -0400 Subject: [PATCH 02/11] use iteration in readTaggedRPMS --- hub/kojihub.py | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index a2678c7..39c316a 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -1274,31 +1274,32 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest # we can get duplicates if a package is multiply tagged. rpms = [] tags_seen = {} - for tagid in taglist: - if tags_seen.has_key(tagid): - #certain inheritance trees can (legitimately) have the same tag - #appear more than once (perhaps once with a package filter and once - #without). The hard part of that was already done by readTaggedBuilds. - #We only need consider each tag once. Note how we use build_idx below. - #(Without this, we could report the same rpm twice) - continue - else: - tags_seen[tagid] = 1 - query.values['tagid'] = tagid - for rpminfo in query.execute(): - #note: we're checking against the build list because - # it has been filtered by the package list. The tag - # tools should endeavor to keep tag_listing sane w.r.t. - # the package list, but if there is disagreement the package - # list should take priority - build = build_idx.get(rpminfo['build_id'],None) - if build is None: - continue - elif build['tag_id'] != tagid: - #wrong tag + def _iter_rpms(): + for tagid in taglist: + if tags_seen.has_key(tagid): + #certain inheritance trees can (legitimately) have the same tag + #appear more than once (perhaps once with a package filter and once + #without). The hard part of that was already done by readTaggedBuilds. + #We only need consider each tag once. Note how we use build_idx below. + #(Without this, we could report the same rpm twice) continue - rpms.append(rpminfo) - return [rpms,builds] + else: + tags_seen[tagid] = 1 + query.values['tagid'] = tagid + for rpminfo in query.iterate(): + #note: we're checking against the build list because + # it has been filtered by the package list. The tag + # tools should endeavor to keep tag_listing sane w.r.t. + # the package list, but if there is disagreement the package + # list should take priority + build = build_idx.get(rpminfo['build_id'],None) + if build is None: + continue + elif build['tag_id'] != tagid: + #wrong tag + continue + yield rpminfo + return [_iter_rpms(), builds] def readTaggedArchives(tag, package=None, event=None, inherit=False, latest=True, type=None): """Returns a list of archives for specified tag -- 1.8.3.1
>From 5f6c664e0bb42e495ecd1709c255597034d179fe Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Sat, 27 Aug 2011 23:50:39 -0400 Subject: [PATCH 03/11] rework repo_init for iterator efficiency --- hub/kojihub.py | 111 ++++++++++++++++++++++----------------------------------- 1 file changed, 43 insertions(+), 68 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index 39c316a..389272d 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -2172,7 +2172,10 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None): repo_arches = {} if tinfo['arches']: for arch in tinfo['arches'].split(): - repo_arches[koji.canonArch(arch)] = 1 + arch = koji.canonArch(arch) + if arch in ['src','noarch']: + continue + repo_arches[arch] = 1 repo_id = _singleValue("SELECT nextval('repo_id_seq')") if event is None: event_id = _singleValue("SELECT get_event()") @@ -2181,9 +2184,9 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None): q = "SELECT time FROM events WHERE id=%(event)s" event_time = _singleValue(q, locals(), strict=True) event_id = event - q = """INSERT INTO repo(id, create_event, tag_id, state) - VALUES(%(repo_id)s, %(event_id)s, %(tag_id)s, %(state)s)""" - _dml(q,locals()) + insert = InsertProcessor('repo') + insert.set(id=repo_id, create_event=event_id, tag_id=tag_id, state=state) + insert.execute() # Need to pass event_id because even though this is a single transaction, # it is possible to see the results of other committed transactions rpms, builds = readTaggedRPMS(tag_id, event=event_id, inherit=True, latest=True) @@ -2192,30 +2195,7 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None): if pkg['blocked']] repodir = koji.pathinfo.repo(repo_id, tinfo['name']) os.makedirs(repodir) #should not already exist - #index builds - builds = dict([[build['build_id'],build] for build in builds]) - #index the packages by arch - packages = {} - for repoarch in repo_arches: - packages.setdefault(repoarch, []) - relpathinfo = koji.PathInfo(topdir='toplink') - for rpminfo in rpms: - if not with_debuginfo and koji.is_debuginfo(rpminfo['name']): - continue - arch = rpminfo['arch'] - repoarch = koji.canonArch(arch) - if arch == 'src': - if not with_src: - continue - elif arch == 'noarch': - pass - elif repoarch not in repo_arches: - # Do not create a repo for arches not in the arch list for this tag - continue - build = builds[rpminfo['build_id']] - rpminfo['relpath'] = "%s/%s" % (relpathinfo.build(build), relpathinfo.rpm(rpminfo)) - rpminfo['relpath'] = rpminfo['relpath'].lstrip('/') - packages.setdefault(repoarch,[]).append(rpminfo) + #generate comps and groups.spec groupsdir = "%s/groups" % (repodir) koji.ensuredir(groupsdir) @@ -2224,57 +2204,52 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None): fo.write(comps) fo.close() + #get build dirs + relpathinfo = koji.PathInfo(topdir='toplink') + builddirs = {} + for build in builds: + relpath = relpathinfo.build(build) + builddirs[build['id']] = relpath.lstrip('/') #generate pkglist files - for arch in packages.iterkeys(): - if arch in ['src','noarch']: - continue - # src and noarch special-cased -- see below - archdir = os.path.join(repodir, arch) + pkglist = {} + for repoarch in repo_arches: + archdir = os.path.join(repodir, repoarch) koji.ensuredir(archdir) # Make a symlink to our topdir top_relpath = koji.util.relpath(koji.pathinfo.topdir, archdir) top_link = os.path.join(archdir, 'toplink') os.symlink(top_relpath, top_link) - pkglist = file(os.path.join(repodir, arch, 'pkglist'), 'w') - logger.info("Creating package list for %s" % arch) - for rpminfo in packages[arch]: - pkglist.write(rpminfo['relpath'] + '\n') - #noarch packages - for rpminfo in packages.get('noarch',[]): - pkglist.write(rpminfo['relpath'] + '\n') - # srpms - if with_src: - srpmdir = "%s/%s" % (repodir,'src') - koji.ensuredir(srpmdir) - for rpminfo in packages.get('src',[]): - pkglist.write(rpminfo['relpath'] + '\n') - pkglist.close() - #write list of blocked packages - blocklist = file(os.path.join(repodir, arch, 'blocklist'), 'w') - logger.info("Creating blocked list for %s" % arch) + pkglist[repoarch] = file(os.path.join(archdir, 'pkglist'), 'w') + #NOTE - rpms is now an iterator + for rpminfo in rpms: + if not with_debuginfo and koji.is_debuginfo(rpminfo['name']): + continue + relpath = "%s/%s\n" % (builddirs[rpminfo['build_id']], relpathinfo.rpm(rpminfo)) + arch = rpminfo['arch'] + if arch == 'src': + if with_src: + for repoarch in repo_arches: + pkglist[repoarch].write(relpath) + elif arch == 'noarch': + for repoarch in repo_arches: + pkglist[repoarch].write(relpath) + else: + repoarch = koji.canonArch(arch) + if repoarch not in repo_arches: + # Do not create a repo for arches not in the arch list for this tag + continue + pkglist[repoarch].write(relpath) + for repoarch in repo_arches: + pkglist[repoarch].close() + + #write blocked package lists + for repoarch in repo_arches: + blocklist = file(os.path.join(repodir, repoarch, 'blocklist'), 'w') for pkg in blocks: blocklist.write(pkg['package_name']) blocklist.write('\n') blocklist.close() - # if using an external repo, make sure we've created a directory and pkglist for - # every arch in the taglist, or any packages of that arch in the external repo - # won't be processed - if get_external_repo_list(tinfo['id'], event=event_id): - for arch in repo_arches: - pkglist = os.path.join(repodir, arch, 'pkglist') - if not os.path.exists(pkglist): - logger.info("Creating missing package list for %s" % arch) - koji.ensuredir(os.path.dirname(pkglist)) - pkglist_fo = file(pkglist, 'w') - pkglist_fo.close() - blocklist = file(os.path.join(repodir, arch, 'blocklist'), 'w') - logger.info("Creating missing blocked list for %s" % arch) - for pkg in blocks: - blocklist.write(pkg['package_name']) - blocklist.write('\n') - blocklist.close() - if context.opts.get('EnableMaven') and tinfo['maven_support']: artifact_dirs = {} dir_links = set() -- 1.8.3.1
>From 293e777a09e053a23bfe8ff606a981b4fd026904 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Tue, 20 Sep 2011 12:50:07 -0400 Subject: [PATCH 04/11] QueryProcessor iteration refinements skip cursor if limit < chunksize avoid possibly volatile parameters during iteration --- hub/kojihub.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index 389272d..2fb8a31 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -6269,6 +6269,9 @@ class QueryProcessor(object): column values in query order, rather than the usual list of maps rowlock: if True, use "FOR UPDATE" to lock the queried rows """ + + iterchunksize = 1000 + def __init__(self, columns=None, aliases=None, tables=None, joins=None, clauses=None, values=None, opts=None): self.columns = columns @@ -6409,22 +6412,30 @@ SELECT %(col_str)s def iterate(self): if self.opts.get('countOnly'): return self.execute() + elif self.opts.get('limit') and self.opts['limit'] < self.iterchunksize: + return self.execute() else: - return self._iterate(str(self), self.values.copy(), self.opts.get('asList')) - - def _iterate(self, query, values, as_list=False): + fields = self.aliases or self.columns + fields = list(fields) + return self._iterate(str(self), self.values.copy(), fields, + self.iterchunksize, self.opts.get('asList')) + + def _iterate(self, query, values, fields, chunksize, as_list=False): + # We pass all this data into the generator so that the iterator works + # from the snapshot when it was generated. Otherwise reuse of the processor + # for similar queries could have unpredictable results. cname = "qp_cursor_%s_%i" % (id(self), self.cursors) self.cursors += 1 - query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, self) + query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, query) c = context.cnx.cursor() - c.execute(query, self.values) + c.execute(query, values) c.close() - query = "FETCH 1000 FROM %s" % cname + query = "FETCH %i FROM %s" % (chunksize, cname) while True: if as_list: buf = _fetchMulti(query, {}) else: - buf = _multiRow(query, {}, (self.aliases or self.columns)) + buf = _multiRow(query, {}, fields) if not buf: return for row in buf: -- 1.8.3.1
>From 2c1ae8485178175d52c0a918e8460865e5cc0665 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Tue, 20 Sep 2011 15:20:40 -0400 Subject: [PATCH 05/11] limit request size --- hub/kojixmlrpc.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py index 26204c0..6d23b7d 100644 --- a/hub/kojixmlrpc.py +++ b/hub/kojixmlrpc.py @@ -213,10 +213,15 @@ class ModXMLRPCRequestHandler(object): def _read_request(self, stream): parser, unmarshaller = getparser() + len = 0 + maxlen = opts.get('MaxRequestLength', None) while True: chunk = stream.read(8192) if not chunk: break + len += len(chunk) + if maxlen and len > maxlen: + raise koji.GenericError, 'Request too long' parser.feed(chunk) parser.close() return unmarshaller.close(), unmarshaller.getmethodname() @@ -443,6 +448,7 @@ def load_config(environ): ['RLIMIT_STACK', 'string', None], ['MemoryWarnThreshold', 'integer', 5000], + ['MaxRequestLength', 'integer', 4194304], ['LockOut', 'boolean', False], ['ServerOffline', 'boolean', False], -- 1.8.3.1
>From f1b960a989626221dfc1d778005c34d09857aa19 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Thu, 24 Jul 2014 16:04:53 -0400 Subject: [PATCH 06/11] use pid in cursor name --- hub/kojihub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index 2fb8a31..cd7a811 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -6424,7 +6424,7 @@ SELECT %(col_str)s # We pass all this data into the generator so that the iterator works # from the snapshot when it was generated. Otherwise reuse of the processor # for similar queries could have unpredictable results. - cname = "qp_cursor_%s_%i" % (id(self), self.cursors) + cname = "qp_cursor_%s_%i_%i" % (id(self), os.getpid(), self.cursors) self.cursors += 1 query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, query) c = context.cnx.cursor() -- 1.8.3.1
>From 9a8efa8d97dc399c71366dba90bd65ab526c0933 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Thu, 24 Jul 2014 18:24:19 -0400 Subject: [PATCH 07/11] fix variable name --- hub/kojixmlrpc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py index 6d23b7d..ae6320d 100644 --- a/hub/kojixmlrpc.py +++ b/hub/kojixmlrpc.py @@ -213,14 +213,14 @@ class ModXMLRPCRequestHandler(object): def _read_request(self, stream): parser, unmarshaller = getparser() - len = 0 + rlen = 0 maxlen = opts.get('MaxRequestLength', None) while True: chunk = stream.read(8192) if not chunk: break - len += len(chunk) - if maxlen and len > maxlen: + rlen += len(chunk) + if maxlen and rlen > maxlen: raise koji.GenericError, 'Request too long' parser.feed(chunk) parser.close() -- 1.8.3.1
>From 3472e5bda83166c55c15a7bd50268900b887e5c4 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Fri, 25 Jul 2014 10:30:44 -0400 Subject: [PATCH 08/11] fix to address for gc notices (ticket 268) --- util/koji-gc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/koji-gc b/util/koji-gc index 424b0fb..d86e4fd 100755 --- a/util/koji-gc +++ b/util/koji-gc @@ -421,7 +421,7 @@ refer to the document linked above for instructions.""" else: msg['Subject'] = "%i builds marked for deletion" % len(builds) msg['From'] = options.from_addr - msg['To'] = "%s@%s" % owner_name, options.email_domain #XXX! + msg['To'] = "%s@%s" % (owner_name, options.email_domain) #XXX! msg['X-Koji-Builder'] = owner_name if options.test: if options.debug: -- 1.8.3.1
>From 9577de941ed42d7b8191e1e1dc929d27e0620a42 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Fri, 25 Jul 2014 15:54:51 -0400 Subject: [PATCH 09/11] return a generator from maven_tag_archives --- hub/kojihub.py | 92 ++++++++++++++++++++++++++++++---------------------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index cd7a811..5c666ea 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -2109,53 +2109,57 @@ def maven_tag_archives(tag_id, event_id=None, inherit=True): clauses=clauses, opts={'order': order}, columns=[f[0] for f in fields], aliases=[f[1] for f in fields]) - results = [] included = {} included_archives = set() - for tag_id in taglist: - taginfo = get_tag(tag_id, strict=True, event=event_id) - query.values['tag_id'] = tag_id - archives = query.execute() - for archive in archives: - pkg = packages.get(archive['pkg_id']) - if not pkg or pkg['blocked']: - continue - # 4 possibilities: - # 1: we have never seen this group_id:artifact_id before - # - add it to the results, and it to the included dict - # 2: we have seen the group_id:artifact_id before, but a different version - # - if the taginfo['maven_include_all'] is true, add it to the results and - # append it to the included_versions dict, otherwise skip it - # 3: we have seen the group_id:artifact_id before, with the same version, from - # a different build - # - this is a different revision of the same GAV, ignore it because a more - # recently-tagged build has already been included - # 4: we have seen the group_id:artifact_id before, with the same version, from - # the same build - # - it is another artifact from a build we're already including, so include it - # as well - ga = '%(group_id)s:%(artifact_id)s' % archive - included_versions = included.get(ga) - if not included_versions: - included[ga] = {archive['version']: archive['build_id']} - included_archives.add(archive['id']) - results.append(archive) - continue - included_build = included_versions.get(archive['version']) - if not included_build: - if taginfo['maven_include_all']: - included_versions[archive['version']] = archive['build_id'] + # these indexes eat into the memory savings of the generator, but it's only + # group_id/artifact_id/version/build_id/archive_id, which is much smaller than + # the full query + # ballpark estimate: 20-25% of total, less with heavy duplication of indexed values + def _iter_archives(): + for tag_id in taglist: + taginfo = get_tag(tag_id, strict=True, event=event_id) + query.values['tag_id'] = tag_id + archives = query.iterate() + for archive in archives: + pkg = packages.get(archive['pkg_id']) + if not pkg or pkg['blocked']: + continue + # 4 possibilities: + # 1: we have never seen this group_id:artifact_id before + # - yield it, and add to the included dict + # 2: we have seen the group_id:artifact_id before, but a different version + # - if the taginfo['maven_include_all'] is true, yield it and + # append it to the included_versions dict, otherwise skip it + # 3: we have seen the group_id:artifact_id before, with the same version, from + # a different build + # - this is a different revision of the same GAV, ignore it because a more + # recently-tagged build has already been included + # 4: we have seen the group_id:artifact_id before, with the same version, from + # the same build + # - it is another artifact from a build we're already including, so include it + # as well + ga = '%(group_id)s:%(artifact_id)s' % archive + included_versions = included.get(ga) + if not included_versions: + included[ga] = {archive['version']: archive['build_id']} included_archives.add(archive['id']) - results.append(archive) - continue - if included_build != archive['build_id']: - continue - # make sure we haven't already seen this archive somewhere else in the - # tag hierarchy - if archive['id'] not in included_archives: - included_archives.add(archive['id']) - results.append(archive) - return results + yield archive + continue + included_build = included_versions.get(archive['version']) + if not included_build: + if taginfo['maven_include_all']: + included_versions[archive['version']] = archive['build_id'] + included_archives.add(archive['id']) + yield archive + continue + if included_build != archive['build_id']: + continue + # make sure we haven't already seen this archive somewhere else in the + # tag hierarchy + if archive['id'] not in included_archives: + included_archives.add(archive['id']) + yield archive + return _iter_archives() def repo_init(tag, with_src=False, with_debuginfo=False, event=None): """Create a new repo entry in the INIT state, return full repo data -- 1.8.3.1
>From e35d0803f33e72a81a38ec8290b59f529f630373 Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Mon, 28 Jul 2014 17:33:49 -0400 Subject: [PATCH 10/11] handle maven_tag_archives generator in updateMavenBuildRootList --- hub/kojihub.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index 5c666ea..b825e47 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -10372,7 +10372,17 @@ class HostExports(object): repo = repo_info(br.data['repo_id'], strict=True) tag = get_tag(repo['tag_id'], strict=True) - tag_archives = maven_tag_archives(tag['id'], event_id=repo['create_event']) + maven_build_index = {} + # Index the maven_tag_archives result by group_id:artifact_id:version + # The function ensures that each g:a:v maps to a single build id. + # The generator returned by maven_tag_archives can create a lot of data, + # but this index will only consume a fraction of that. + for archive in maven_tag_archives(tag['id'], event_id=repo['create_event']): + # unfortunately pgdb does not appear to intern strings, but still + # better not to create any new ones + maven_build_index.setdefault( + archive['group_id'], {}).setdefault( + archive['artifact_id'], {})[archive['version']] = archive['build_id'] if not ignore: ignore = [] @@ -10404,14 +10414,13 @@ class HostExports(object): 'files': [fileinfo]} else: build = get_build(dep, strict=True) - build_archives = list_archives(buildID=build['id'], type='maven') - tag_archives.extend(build_archives) - ignore.extend(task_deps.values()) + for archive in list_archives(buildID=build['id'], type='maven'): + maven_build_index.setdefault( + archive['group_id'], {}).setdefault( + archive['artifact_id'], {}).setdefault( + archive['version'], archive['build_id']) - archives_by_label = {} - for archive in tag_archives: - maven_label = koji.mavenLabel(archive) - archives_by_label.setdefault(maven_label, {})[archive['filename']] = archive + ignore.extend(task_deps.values()) SNAPSHOT_RE = re.compile(r'-\d{8}\.\d{6}-\d+') ignore_by_label = {} @@ -10435,11 +10444,24 @@ class HostExports(object): maven_info = entry['maven_info'] maven_label = koji.mavenLabel(maven_info) ignore_archives = ignore_by_label.get(maven_label, {}) - label_archives = archives_by_label.get(maven_label, {}) + build_id = maven_build_index.get( + maven_info['group_id'], {}).get( + maven_info['artifact_id'], {}).get( + maven_info['version']) + if not build_id: + if not ignore_unknown: + raise koji.BuildrootError, 'Unmatched maven g:a:v in build environment: ' \ + '%(group_id)s:%(artifact_id)s:%(version)s' % maven_info + build_archives = {} + else: + tinfo = dslice(maven_info, ['group_id', 'artifact_id', 'version']) + build_archives = list_archives(type='maven', typeInfo=tinfo) + # index by filename + build_archives = dict([(a['filename'], a) for a in build_archives]) for fileinfo in entry['files']: ignore_archive = ignore_archives.get(fileinfo['filename']) - tag_archive = label_archives.get(fileinfo['filename']) + tag_archive = build_archives.get(fileinfo['filename']) if tag_archive and fileinfo['size'] == tag_archive['size']: archives.append(tag_archive) elif ignore_archive and fileinfo['size'] == ignore_archive['size']: -- 1.8.3.1
>From dcf97e335b1bf6108724c25fa90bcb338c673d6a Mon Sep 17 00:00:00 2001 From: Mike McLean <[email protected]> Date: Tue, 29 Jul 2014 17:19:36 -0400 Subject: [PATCH 11/11] chunksize option for iterate() --- hub/kojihub.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/hub/kojihub.py b/hub/kojihub.py index b825e47..467a48d 100644 --- a/hub/kojihub.py +++ b/hub/kojihub.py @@ -1286,7 +1286,7 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest else: tags_seen[tagid] = 1 query.values['tagid'] = tagid - for rpminfo in query.iterate(): + for rpminfo in query.iterate(chunksize=5000): #note: we're checking against the build list because # it has been filtered by the package list. The tag # tools should endeavor to keep tag_listing sane w.r.t. @@ -2119,7 +2119,7 @@ def maven_tag_archives(tag_id, event_id=None, inherit=True): for tag_id in taglist: taginfo = get_tag(tag_id, strict=True, event=event_id) query.values['tag_id'] = tag_id - archives = query.iterate() + archives = query.iterate(chunksize=5000) for archive in archives: pkg = packages.get(archive['pkg_id']) if not pkg or pkg['blocked']: @@ -6274,7 +6274,7 @@ class QueryProcessor(object): rowlock: if True, use "FOR UPDATE" to lock the queried rows """ - iterchunksize = 1000 + iterchunksize = 2000 def __init__(self, columns=None, aliases=None, tables=None, joins=None, clauses=None, values=None, opts=None): @@ -6413,16 +6413,18 @@ SELECT %(col_str)s return _multiRow(query, self.values, (self.aliases or self.columns)) - def iterate(self): + def iterate(self, chunksize=None): + if chunksize is None: + chunksize = self.iterchunksize if self.opts.get('countOnly'): return self.execute() - elif self.opts.get('limit') and self.opts['limit'] < self.iterchunksize: + elif self.opts.get('limit') and self.opts['limit'] < chunksize: return self.execute() else: fields = self.aliases or self.columns fields = list(fields) return self._iterate(str(self), self.values.copy(), fields, - self.iterchunksize, self.opts.get('asList')) + chunksize, self.opts.get('asList')) def _iterate(self, query, values, fields, chunksize, as_list=False): # We pass all this data into the generator so that the iterator works -- 1.8.3.1
-- buildsys mailing list [email protected] https://admin.fedoraproject.org/mailman/listinfo/buildsys
