These changes cause the hub to use generators for some potentially large
queries internally. The point is to reduce memory usage.

In the case that a large query is returned via rpc, the marshaller has
been extended to handle generators. The marshaller will still create a
giant pile of xml, but we at least still save the memory for the
original data.

I also have some work on an iterating marshaller that will not keep the
entire xml return in memory, but that's a bit more complicated, so I'm
going to leave that for another day.

There are other cases (e.g. repo_init) where we have very large queries
internally. In these cases, this patch set will result in significant
memory savings.

>From 5436e9a87e48ed7f0ab95ff040b1961dd6f9fd04 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Sat, 27 Aug 2011 16:32:36 -0400
Subject: [PATCH 01/11] Use iterators for potentially large queries to reduce
 memory use

---
 hub/kojihub.py    | 121 ++++++++++++++++++++++++++++++++++++------------------
 hub/kojixmlrpc.py |  17 ++++++++
 2 files changed, 97 insertions(+), 41 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index e75c2e9..a2678c7 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -1115,7 +1115,7 @@ def list_tags(build=None, package=None, queryOpts=None):
     query = QueryProcessor(columns=fields, aliases=aliases, tables=tables,
                            joins=joins, clauses=clauses, values=locals(),
                            opts=queryOpts)
-    return query.execute()
+    return query.iterate()
 
 def readTaggedBuilds(tag,event=None,inherit=False,latest=False,package=None,owner=None,type=None):
     """Returns a list of builds for specified tag
@@ -1244,32 +1244,31 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest
               ('rpminfo.buildtime', 'buildtime'),
               ('rpminfo.buildroot_id', 'buildroot_id'),
               ('rpminfo.build_id', 'build_id')]
-    if rpmsigs:
-        fields.append(('rpmsigs.sigkey', 'sigkey'))
-    q="""SELECT %s FROM rpminfo
-    JOIN tag_listing ON rpminfo.build_id = tag_listing.build_id
-    """ % ', '.join([pair[0] for pair in fields])
+    fields, aliases = zip(*fields)
+    tables = ['rpminfo']
+    joins = ['tag_listing ON rpminfo.build_id = tag_listing.build_id']
+    clauses = [eventCondition(event), 'tag_id=%(tagid)s']
+    data = {}  #tagid added later
     if package:
-        q += """JOIN build ON rpminfo.build_id = build.id
-        JOIN package ON package.id = build.pkg_id
-        """
+        joins.append('build ON rpminfo.build_id = build.id')
+        joins.append('package ON package.id = build.pkg_id')
+        clauses.append('package.name = %(package)s')
+        data['package'] = package
     if rpmsigs:
-        q += """LEFT OUTER JOIN rpmsigs on rpminfo.id = rpmsigs.rpm_id
-        """
-    q += """WHERE %s AND tag_id=%%(tagid)s
-    """ % eventCondition(event)
-    if package:
-        q += """AND package.name = %(package)s
-        """
+        fields.append(('rpmsigs.sigkey', 'sigkey'))
+        joins.append('LEFT OUTER JOIN rpmsigs on rpminfo.id = rpmsigs.rpm_id')
     if arch:
+        data['arch'] = arch
         if isinstance(arch, basestring):
-            q += """AND rpminfo.arch = %(arch)s
-            """
+            clauses.append('rpminfo.arch = %(arch)s')
         elif isinstance(arch, (list, tuple)):
-            q += """AND rpminfo.arch IN %(arch)s\n"""
+            clauses.append('rpminfo.arch IN %(arch)s')
         else:
             raise koji.GenericError, 'invalid arch option: %s' % arch
 
+    query = QueryProcessor(tables=tables, joins=joins, clauses=clauses,
+                           columns=fields, aliases=aliases, values=data)
+
     # unique constraints ensure that each of these queries will not report
     # duplicate rpminfo entries, BUT since we make the query multiple times,
     # we can get duplicates if a package is multiply tagged.
@@ -1285,7 +1284,8 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest
             continue
         else:
             tags_seen[tagid] = 1
-        for rpminfo in _multiRow(q, locals(), [pair[1] for pair in fields]):
+        query.values['tagid'] = tagid
+        for rpminfo in query.execute():
             #note: we're checking against the build list because
             # it has been filtered by the package list. The tag
             # tools should endeavor to keep tag_listing sane w.r.t.
@@ -2452,7 +2452,7 @@ def tag_changed_since_event(event,taglist):
     clauses = ['update_event > %(event)i', 'tag_id IN %(taglist)s']
     query = QueryProcessor(tables=['tag_updates'], columns=['id'],
                             clauses=clauses, values=data,
-                            opts={'asList': True})
+                            opts={'limit': 1})
     if query.execute():
         return True
     #also check these versioned tables
@@ -2470,7 +2470,7 @@ def tag_changed_since_event(event,taglist):
                'tag_id IN %(taglist)s']
     for table in tables:
         query = QueryProcessor(tables=[table], columns=['tag_id'], clauses=clauses,
-                                values=data, opts={'asList': True})
+                                values=data, opts={'limit': 1})
         if query.execute():
             return True
     return False
@@ -5545,7 +5545,7 @@ def query_history(tables=None, **kwargs):
         fields, aliases = zip(*fields.items())
         query = QueryProcessor(columns=fields, aliases=aliases, tables=[table],
                                joins=joins, clauses=clauses, values=data)
-        ret[table] = query.execute()
+        ret[table] = query.iterate()
     return ret
 
 
@@ -5598,7 +5598,7 @@ def tag_history(build=None, tag=None, package=None, active=None, queryOpts=None)
     query = QueryProcessor(columns=fields, aliases=aliases, tables=tables,
                            joins=joins, clauses=clauses, values=locals(),
                            opts=queryOpts)
-    return query.execute()
+    return query.iterate()
 
 def untagged_builds(name=None, queryOpts=None):
     """Returns the list of untagged builds"""
@@ -5624,7 +5624,7 @@ def untagged_builds(name=None, queryOpts=None):
     query = QueryProcessor(columns=fields, aliases=aliases, tables=tables,
                            joins=joins, clauses=clauses, values=locals(),
                            opts=queryOpts)
-    return query.execute()
+    return query.iterate()
 
 def build_map():
     """Map which builds were used in the buildroots of other builds
@@ -6306,6 +6306,7 @@ class QueryProcessor(object):
         self.tables = tables
         self.joins = joins
         self.clauses = clauses
+        self.cursors = 0
         if values:
             self.values = values
         else:
@@ -6428,6 +6429,31 @@ SELECT %(col_str)s
         else:
             return _multiRow(query, self.values, (self.aliases or self.columns))
 
+
+    def iterate(self):
+        if self.opts.get('countOnly'):
+            return self.execute()
+        else:
+            return self._iterate(str(self), self.values.copy(), self.opts.get('asList'))
+
+    def _iterate(self, query, values, as_list=False):
+        cname = "qp_cursor_%s_%i" % (id(self), self.cursors)
+        self.cursors += 1
+        query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, self)
+        c = context.cnx.cursor()
+        c.execute(query, self.values)
+        c.close()
+        query = "FETCH 1000 FROM %s" % cname
+        while True:
+            if as_list:
+                buf = _fetchMulti(query, {})
+            else:
+                buf = _multiRow(query, {}, (self.aliases or self.columns))
+            if not buf:
+                return
+            for row in buf:
+                yield row
+
     def executeOne(self):
         results = self.execute()
         if isinstance(results, list):
@@ -8090,7 +8116,7 @@ class RootExports(object):
                                tables=tables, joins=joins, clauses=clauses,
                                values=locals(), opts=queryOpts)
 
-        return query.execute()
+        return query.iterate()
 
     def getLatestBuilds(self,tag,event=None,package=None,type=None):
         """List latest builds for tag (inheritance enabled)"""
@@ -8763,27 +8789,40 @@ class RootExports(object):
 
         query = QueryProcessor(columns=fields, aliases=aliases, tables=tables, joins=joins,
                                clauses=conditions, values=opts, opts=queryOpts)
-        tasks = query.execute()
+        tasks = query.iterate()
         if queryOpts and (queryOpts.get('countOnly') or queryOpts.get('asList')):
             # Either of the above options makes us unable to easily the decode
             # the xmlrpc data
             return tasks
 
-        if opts.get('decode'):
-            for task in tasks:
-                # decode xmlrpc data
-                for f in ('request','result'):
-                    if task[f]:
-                        try:
-                            if task[f].find('<?xml', 0, 10) == -1:
-                                #handle older base64 encoded data
-                                task[f] = base64.decodestring(task[f])
-                            data, method = xmlrpclib.loads(task[f])
-                        except xmlrpclib.Fault, fault:
-                            data = fault
-                        task[f] = data
+        if opts.get('decode') and not queryOpts.get('countOnly'):
+            if queryOpts.get('asList'):
+                keys = []
+                for n, f in aliases:
+                    if f in ('request','result'):
+                        keys.append(n)
+            else:
+                keys = ('request','result')
+            tasks = self._decode_tasks(tasks, keys)
+
         return tasks
 
+    def _decode_tasks(self, tasks, keys):
+        for task in tasks:
+            # decode xmlrpc data
+            for f in keys:
+                val = task[f]
+                if val:
+                    try:
+                        if val.find('<?xml', 0, 10) == -1:
+                            #handle older base64 encoded data
+                            val = base64.decodestring(val)
+                        data, method = xmlrpclib.loads(val)
+                    except xmlrpclib.Fault, fault:
+                        data = fault
+                    task[f] = data
+            yield task
+
     def taskReport(self, owner=None):
         """Return data on active or recent tasks"""
         fields = (
@@ -9263,7 +9302,7 @@ class RootExports(object):
                                aliases=aliases, tables=(table,),
                                joins=joins, clauses=(clause,),
                                values=locals(), opts=queryOpts)
-        return query.execute()
+        return query.iterate()
 
 
 class BuildRoot(object):
diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py
index 8b67365..26204c0 100644
--- a/hub/kojixmlrpc.py
+++ b/hub/kojixmlrpc.py
@@ -28,6 +28,7 @@ import traceback
 import types
 import pprint
 import resource
+import xmlrpclib
 from xmlrpclib import getparser,dumps,Fault
 from koji.server import WSGIWrapper
 
@@ -40,6 +41,22 @@ import koji.util
 from koji.context import context
 
 
+# Workaround to allow xmlrpclib deal with iterators
+class Marshaller(xmlrpclib.Marshaller):
+
+    dispatch = xmlrpclib.Marshaller.dispatch.copy()
+
+    def dump_generator(self, value, write):
+        dump = self.__dump
+        write("<value><array><data>\n")
+        for v in value:
+            dump(v, write)
+        write("</data></array></value>\n")
+    dispatch[types.GeneratorType] = dump_generator
+
+xmlrpclib.Marshaller = Marshaller
+
+
 class HandlerRegistry(object):
     """Track handlers for RPC calls"""
 
-- 
1.8.3.1

>From ac1152361ff3d2d88b888019afa9d5f1dd2bde6e Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Sat, 27 Aug 2011 21:33:46 -0400
Subject: [PATCH 02/11] use iteration in readTaggedRPMS

---
 hub/kojihub.py | 49 +++++++++++++++++++++++++------------------------
 1 file changed, 25 insertions(+), 24 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index a2678c7..39c316a 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -1274,31 +1274,32 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest
     # we can get duplicates if a package is multiply tagged.
     rpms = []
     tags_seen = {}
-    for tagid in taglist:
-        if tags_seen.has_key(tagid):
-            #certain inheritance trees can (legitimately) have the same tag
-            #appear more than once (perhaps once with a package filter and once
-            #without). The hard part of that was already done by readTaggedBuilds.
-            #We only need consider each tag once. Note how we use build_idx below.
-            #(Without this, we could report the same rpm twice)
-            continue
-        else:
-            tags_seen[tagid] = 1
-        query.values['tagid'] = tagid
-        for rpminfo in query.execute():
-            #note: we're checking against the build list because
-            # it has been filtered by the package list. The tag
-            # tools should endeavor to keep tag_listing sane w.r.t.
-            # the package list, but if there is disagreement the package
-            # list should take priority
-            build = build_idx.get(rpminfo['build_id'],None)
-            if build is None:
-                continue
-            elif build['tag_id'] != tagid:
-                #wrong tag
+    def _iter_rpms():
+        for tagid in taglist:
+            if tags_seen.has_key(tagid):
+                #certain inheritance trees can (legitimately) have the same tag
+                #appear more than once (perhaps once with a package filter and once
+                #without). The hard part of that was already done by readTaggedBuilds.
+                #We only need consider each tag once. Note how we use build_idx below.
+                #(Without this, we could report the same rpm twice)
                 continue
-            rpms.append(rpminfo)
-    return [rpms,builds]
+            else:
+                tags_seen[tagid] = 1
+            query.values['tagid'] = tagid
+            for rpminfo in query.iterate():
+                #note: we're checking against the build list because
+                # it has been filtered by the package list. The tag
+                # tools should endeavor to keep tag_listing sane w.r.t.
+                # the package list, but if there is disagreement the package
+                # list should take priority
+                build = build_idx.get(rpminfo['build_id'],None)
+                if build is None:
+                    continue
+                elif build['tag_id'] != tagid:
+                    #wrong tag
+                    continue
+                yield rpminfo
+    return [_iter_rpms(), builds]
 
 def readTaggedArchives(tag, package=None, event=None, inherit=False, latest=True, type=None):
     """Returns a list of archives for specified tag
-- 
1.8.3.1

>From 5f6c664e0bb42e495ecd1709c255597034d179fe Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Sat, 27 Aug 2011 23:50:39 -0400
Subject: [PATCH 03/11] rework repo_init for iterator efficiency

---
 hub/kojihub.py | 111 ++++++++++++++++++++++-----------------------------------
 1 file changed, 43 insertions(+), 68 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index 39c316a..389272d 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -2172,7 +2172,10 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None):
     repo_arches = {}
     if tinfo['arches']:
         for arch in tinfo['arches'].split():
-            repo_arches[koji.canonArch(arch)] = 1
+            arch = koji.canonArch(arch)
+            if arch in ['src','noarch']:
+                continue
+            repo_arches[arch] = 1
     repo_id = _singleValue("SELECT nextval('repo_id_seq')")
     if event is None:
         event_id = _singleValue("SELECT get_event()")
@@ -2181,9 +2184,9 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None):
         q = "SELECT time FROM events WHERE id=%(event)s"
         event_time = _singleValue(q, locals(), strict=True)
         event_id = event
-    q = """INSERT INTO repo(id, create_event, tag_id, state)
-    VALUES(%(repo_id)s, %(event_id)s, %(tag_id)s, %(state)s)"""
-    _dml(q,locals())
+    insert = InsertProcessor('repo')
+    insert.set(id=repo_id, create_event=event_id, tag_id=tag_id, state=state)
+    insert.execute()
     # Need to pass event_id because even though this is a single transaction,
     # it is possible to see the results of other committed transactions
     rpms, builds = readTaggedRPMS(tag_id, event=event_id, inherit=True, latest=True)
@@ -2192,30 +2195,7 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None):
                   if pkg['blocked']]
     repodir = koji.pathinfo.repo(repo_id, tinfo['name'])
     os.makedirs(repodir)  #should not already exist
-    #index builds
-    builds = dict([[build['build_id'],build] for build in builds])
-    #index the packages by arch
-    packages = {}
-    for repoarch in repo_arches:
-        packages.setdefault(repoarch, [])
-    relpathinfo = koji.PathInfo(topdir='toplink')
-    for rpminfo in rpms:
-        if not with_debuginfo and koji.is_debuginfo(rpminfo['name']):
-            continue
-        arch = rpminfo['arch']
-        repoarch = koji.canonArch(arch)
-        if arch == 'src':
-            if not with_src:
-                continue
-        elif arch == 'noarch':
-            pass
-        elif repoarch not in repo_arches:
-            # Do not create a repo for arches not in the arch list for this tag
-            continue
-        build = builds[rpminfo['build_id']]
-        rpminfo['relpath'] = "%s/%s" % (relpathinfo.build(build), relpathinfo.rpm(rpminfo))
-        rpminfo['relpath'] = rpminfo['relpath'].lstrip('/')
-        packages.setdefault(repoarch,[]).append(rpminfo)
+
     #generate comps and groups.spec
     groupsdir = "%s/groups" % (repodir)
     koji.ensuredir(groupsdir)
@@ -2224,57 +2204,52 @@ def repo_init(tag, with_src=False, with_debuginfo=False, event=None):
     fo.write(comps)
     fo.close()
 
+    #get build dirs
+    relpathinfo = koji.PathInfo(topdir='toplink')
+    builddirs = {}
+    for build in builds:
+        relpath = relpathinfo.build(build)
+        builddirs[build['id']] = relpath.lstrip('/')
     #generate pkglist files
-    for arch in packages.iterkeys():
-        if arch in ['src','noarch']:
-            continue
-            # src and noarch special-cased -- see below
-        archdir = os.path.join(repodir, arch)
+    pkglist = {}
+    for repoarch in repo_arches:
+        archdir = os.path.join(repodir, repoarch)
         koji.ensuredir(archdir)
         # Make a symlink to our topdir
         top_relpath = koji.util.relpath(koji.pathinfo.topdir, archdir)
         top_link = os.path.join(archdir, 'toplink')
         os.symlink(top_relpath, top_link)
-        pkglist = file(os.path.join(repodir, arch, 'pkglist'), 'w')
-        logger.info("Creating package list for %s" % arch)
-        for rpminfo in packages[arch]:
-            pkglist.write(rpminfo['relpath'] + '\n')
-        #noarch packages
-        for rpminfo in packages.get('noarch',[]):
-            pkglist.write(rpminfo['relpath'] + '\n')
-        # srpms
-        if with_src:
-            srpmdir = "%s/%s" % (repodir,'src')
-            koji.ensuredir(srpmdir)
-            for rpminfo in packages.get('src',[]):
-                pkglist.write(rpminfo['relpath'] + '\n')
-        pkglist.close()
-        #write list of blocked packages
-        blocklist = file(os.path.join(repodir, arch, 'blocklist'), 'w')
-        logger.info("Creating blocked list for %s" % arch)
+        pkglist[repoarch] = file(os.path.join(archdir, 'pkglist'), 'w')
+    #NOTE - rpms is now an iterator
+    for rpminfo in rpms:
+        if not with_debuginfo and koji.is_debuginfo(rpminfo['name']):
+            continue
+        relpath = "%s/%s\n" % (builddirs[rpminfo['build_id']], relpathinfo.rpm(rpminfo))
+        arch = rpminfo['arch']
+        if arch == 'src':
+            if with_src:
+                for repoarch in repo_arches:
+                    pkglist[repoarch].write(relpath)
+        elif arch == 'noarch':
+            for repoarch in repo_arches:
+                pkglist[repoarch].write(relpath)
+        else:
+            repoarch = koji.canonArch(arch)
+            if repoarch not in repo_arches:
+                # Do not create a repo for arches not in the arch list for this tag
+                continue
+            pkglist[repoarch].write(relpath)
+    for repoarch in repo_arches:
+        pkglist[repoarch].close()
+
+    #write blocked package lists
+    for repoarch in repo_arches:
+        blocklist = file(os.path.join(repodir, repoarch, 'blocklist'), 'w')
         for pkg in blocks:
             blocklist.write(pkg['package_name'])
             blocklist.write('\n')
         blocklist.close()
 
-    # if using an external repo, make sure we've created a directory and pkglist for
-    # every arch in the taglist, or any packages of that arch in the external repo
-    # won't be processed
-    if get_external_repo_list(tinfo['id'], event=event_id):
-        for arch in repo_arches:
-            pkglist = os.path.join(repodir, arch, 'pkglist')
-            if not os.path.exists(pkglist):
-                logger.info("Creating missing package list for %s" % arch)
-                koji.ensuredir(os.path.dirname(pkglist))
-                pkglist_fo = file(pkglist, 'w')
-                pkglist_fo.close()
-                blocklist = file(os.path.join(repodir, arch, 'blocklist'), 'w')
-                logger.info("Creating missing blocked list for %s" % arch)
-                for pkg in blocks:
-                    blocklist.write(pkg['package_name'])
-                    blocklist.write('\n')
-                blocklist.close()
-
     if context.opts.get('EnableMaven') and tinfo['maven_support']:
         artifact_dirs = {}
         dir_links = set()
-- 
1.8.3.1

>From 293e777a09e053a23bfe8ff606a981b4fd026904 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Tue, 20 Sep 2011 12:50:07 -0400
Subject: [PATCH 04/11] QueryProcessor iteration refinements skip cursor if
 limit < chunksize avoid possibly volatile parameters during iteration

---
 hub/kojihub.py | 25 ++++++++++++++++++-------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index 389272d..2fb8a31 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -6269,6 +6269,9 @@ class QueryProcessor(object):
                 column values in query order, rather than the usual list of maps
         rowlock: if True, use "FOR UPDATE" to lock the queried rows
     """
+
+    iterchunksize = 1000
+
     def __init__(self, columns=None, aliases=None, tables=None,
                  joins=None, clauses=None, values=None, opts=None):
         self.columns = columns
@@ -6409,22 +6412,30 @@ SELECT %(col_str)s
     def iterate(self):
         if self.opts.get('countOnly'):
             return self.execute()
+        elif self.opts.get('limit') and self.opts['limit'] < self.iterchunksize:
+            return self.execute()
         else:
-            return self._iterate(str(self), self.values.copy(), self.opts.get('asList'))
-
-    def _iterate(self, query, values, as_list=False):
+            fields = self.aliases or self.columns
+            fields = list(fields)
+            return self._iterate(str(self), self.values.copy(), fields,
+                                 self.iterchunksize, self.opts.get('asList'))
+
+    def _iterate(self, query, values, fields, chunksize, as_list=False):
+        # We pass all this data into the generator so that the iterator works
+        # from the snapshot when it was generated. Otherwise reuse of the processor
+        # for similar queries could have unpredictable results.
         cname = "qp_cursor_%s_%i" % (id(self), self.cursors)
         self.cursors += 1
-        query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, self)
+        query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, query)
         c = context.cnx.cursor()
-        c.execute(query, self.values)
+        c.execute(query, values)
         c.close()
-        query = "FETCH 1000 FROM %s" % cname
+        query = "FETCH %i FROM %s" % (chunksize, cname)
         while True:
             if as_list:
                 buf = _fetchMulti(query, {})
             else:
-                buf = _multiRow(query, {}, (self.aliases or self.columns))
+                buf = _multiRow(query, {}, fields)
             if not buf:
                 return
             for row in buf:
-- 
1.8.3.1

>From 2c1ae8485178175d52c0a918e8460865e5cc0665 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Tue, 20 Sep 2011 15:20:40 -0400
Subject: [PATCH 05/11] limit request size

---
 hub/kojixmlrpc.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py
index 26204c0..6d23b7d 100644
--- a/hub/kojixmlrpc.py
+++ b/hub/kojixmlrpc.py
@@ -213,10 +213,15 @@ class ModXMLRPCRequestHandler(object):
 
     def _read_request(self, stream):
         parser, unmarshaller = getparser()
+        len = 0
+        maxlen = opts.get('MaxRequestLength', None)
         while True:
             chunk = stream.read(8192)
             if not chunk:
                 break
+            len += len(chunk)
+            if maxlen and len > maxlen:
+                raise koji.GenericError, 'Request too long'
             parser.feed(chunk)
         parser.close()
         return unmarshaller.close(), unmarshaller.getmethodname()
@@ -443,6 +448,7 @@ def load_config(environ):
         ['RLIMIT_STACK', 'string', None],
 
         ['MemoryWarnThreshold', 'integer', 5000],
+        ['MaxRequestLength', 'integer', 4194304],
 
         ['LockOut', 'boolean', False],
         ['ServerOffline', 'boolean', False],
-- 
1.8.3.1

>From f1b960a989626221dfc1d778005c34d09857aa19 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Thu, 24 Jul 2014 16:04:53 -0400
Subject: [PATCH 06/11] use pid in cursor name

---
 hub/kojihub.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index 2fb8a31..cd7a811 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -6424,7 +6424,7 @@ SELECT %(col_str)s
         # We pass all this data into the generator so that the iterator works
         # from the snapshot when it was generated. Otherwise reuse of the processor
         # for similar queries could have unpredictable results.
-        cname = "qp_cursor_%s_%i" % (id(self), self.cursors)
+        cname = "qp_cursor_%s_%i_%i" % (id(self), os.getpid(), self.cursors)
         self.cursors += 1
         query = "DECLARE %s NO SCROLL CURSOR FOR %s" % (cname, query)
         c = context.cnx.cursor()
-- 
1.8.3.1

>From 9a8efa8d97dc399c71366dba90bd65ab526c0933 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Thu, 24 Jul 2014 18:24:19 -0400
Subject: [PATCH 07/11] fix variable name

---
 hub/kojixmlrpc.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/hub/kojixmlrpc.py b/hub/kojixmlrpc.py
index 6d23b7d..ae6320d 100644
--- a/hub/kojixmlrpc.py
+++ b/hub/kojixmlrpc.py
@@ -213,14 +213,14 @@ class ModXMLRPCRequestHandler(object):
 
     def _read_request(self, stream):
         parser, unmarshaller = getparser()
-        len = 0
+        rlen = 0
         maxlen = opts.get('MaxRequestLength', None)
         while True:
             chunk = stream.read(8192)
             if not chunk:
                 break
-            len += len(chunk)
-            if maxlen and len > maxlen:
+            rlen += len(chunk)
+            if maxlen and rlen > maxlen:
                 raise koji.GenericError, 'Request too long'
             parser.feed(chunk)
         parser.close()
-- 
1.8.3.1

>From 3472e5bda83166c55c15a7bd50268900b887e5c4 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Fri, 25 Jul 2014 10:30:44 -0400
Subject: [PATCH 08/11] fix to address for gc notices (ticket 268)

---
 util/koji-gc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/util/koji-gc b/util/koji-gc
index 424b0fb..d86e4fd 100755
--- a/util/koji-gc
+++ b/util/koji-gc
@@ -421,7 +421,7 @@ refer to the document linked above for instructions."""
     else:
         msg['Subject'] = "%i builds marked for deletion" % len(builds)
     msg['From'] = options.from_addr
-    msg['To'] = "%s@%s" % owner_name, options.email_domain  #XXX!
+    msg['To'] = "%s@%s" % (owner_name, options.email_domain)  #XXX!
     msg['X-Koji-Builder'] = owner_name
     if options.test:
         if options.debug:
-- 
1.8.3.1

>From 9577de941ed42d7b8191e1e1dc929d27e0620a42 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Fri, 25 Jul 2014 15:54:51 -0400
Subject: [PATCH 09/11] return a generator from maven_tag_archives

---
 hub/kojihub.py | 92 ++++++++++++++++++++++++++++++----------------------------
 1 file changed, 48 insertions(+), 44 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index cd7a811..5c666ea 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -2109,53 +2109,57 @@ def maven_tag_archives(tag_id, event_id=None, inherit=True):
                            clauses=clauses, opts={'order': order},
                            columns=[f[0] for f in fields],
                            aliases=[f[1] for f in fields])
-    results = []
     included = {}
     included_archives = set()
-    for tag_id in taglist:
-        taginfo = get_tag(tag_id, strict=True, event=event_id)
-        query.values['tag_id'] = tag_id
-        archives = query.execute()
-        for archive in archives:
-            pkg = packages.get(archive['pkg_id'])
-            if not pkg or pkg['blocked']:
-                continue
-            # 4 possibilities:
-            # 1: we have never seen this group_id:artifact_id before
-            #  - add it to the results, and it to the included dict
-            # 2: we have seen the group_id:artifact_id before, but a different version
-            #  - if the taginfo['maven_include_all'] is true, add it to the results and
-            #    append it to the included_versions dict, otherwise skip it
-            # 3: we have seen the group_id:artifact_id before, with the same version, from
-            #    a different build
-            #  - this is a different revision of the same GAV, ignore it because a more
-            #    recently-tagged build has already been included
-            # 4: we have seen the group_id:artifact_id before, with the same version, from
-            #    the same build
-            #  - it is another artifact from a build we're already including, so include it
-            #    as well
-            ga = '%(group_id)s:%(artifact_id)s' % archive
-            included_versions = included.get(ga)
-            if not included_versions:
-                included[ga] = {archive['version']: archive['build_id']}
-                included_archives.add(archive['id'])
-                results.append(archive)
-                continue
-            included_build = included_versions.get(archive['version'])
-            if not included_build:
-                if taginfo['maven_include_all']:
-                    included_versions[archive['version']] = archive['build_id']
+    # these indexes eat into the memory savings of the generator, but it's only
+    # group_id/artifact_id/version/build_id/archive_id, which is much smaller than
+    # the full query
+    # ballpark estimate: 20-25% of total, less with heavy duplication of indexed values
+    def _iter_archives():
+        for tag_id in taglist:
+            taginfo = get_tag(tag_id, strict=True, event=event_id)
+            query.values['tag_id'] = tag_id
+            archives = query.iterate()
+            for archive in archives:
+                pkg = packages.get(archive['pkg_id'])
+                if not pkg or pkg['blocked']:
+                    continue
+                # 4 possibilities:
+                # 1: we have never seen this group_id:artifact_id before
+                #  - yield it, and add to the included dict
+                # 2: we have seen the group_id:artifact_id before, but a different version
+                #  - if the taginfo['maven_include_all'] is true, yield it and
+                #    append it to the included_versions dict, otherwise skip it
+                # 3: we have seen the group_id:artifact_id before, with the same version, from
+                #    a different build
+                #  - this is a different revision of the same GAV, ignore it because a more
+                #    recently-tagged build has already been included
+                # 4: we have seen the group_id:artifact_id before, with the same version, from
+                #    the same build
+                #  - it is another artifact from a build we're already including, so include it
+                #    as well
+                ga = '%(group_id)s:%(artifact_id)s' % archive
+                included_versions = included.get(ga)
+                if not included_versions:
+                    included[ga] = {archive['version']: archive['build_id']}
                     included_archives.add(archive['id'])
-                    results.append(archive)
-                continue
-            if included_build != archive['build_id']:
-                continue
-            # make sure we haven't already seen this archive somewhere else in the
-            # tag hierarchy
-            if archive['id'] not in included_archives:
-                included_archives.add(archive['id'])
-                results.append(archive)
-    return results
+                    yield archive
+                    continue
+                included_build = included_versions.get(archive['version'])
+                if not included_build:
+                    if taginfo['maven_include_all']:
+                        included_versions[archive['version']] = archive['build_id']
+                        included_archives.add(archive['id'])
+                        yield archive
+                    continue
+                if included_build != archive['build_id']:
+                    continue
+                # make sure we haven't already seen this archive somewhere else in the
+                # tag hierarchy
+                if archive['id'] not in included_archives:
+                    included_archives.add(archive['id'])
+                    yield archive
+    return _iter_archives()
 
 def repo_init(tag, with_src=False, with_debuginfo=False, event=None):
     """Create a new repo entry in the INIT state, return full repo data
-- 
1.8.3.1

>From e35d0803f33e72a81a38ec8290b59f529f630373 Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Mon, 28 Jul 2014 17:33:49 -0400
Subject: [PATCH 10/11] handle maven_tag_archives generator in
 updateMavenBuildRootList

---
 hub/kojihub.py | 42 ++++++++++++++++++++++++++++++++----------
 1 file changed, 32 insertions(+), 10 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index 5c666ea..b825e47 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -10372,7 +10372,17 @@ class HostExports(object):
 
         repo = repo_info(br.data['repo_id'], strict=True)
         tag = get_tag(repo['tag_id'], strict=True)
-        tag_archives = maven_tag_archives(tag['id'], event_id=repo['create_event'])
+        maven_build_index = {}
+        # Index the maven_tag_archives result by group_id:artifact_id:version
+        # The function ensures that each g:a:v maps to a single build id.
+        # The generator returned by maven_tag_archives can create a lot of data,
+        # but this index will only consume a fraction of that.
+        for archive in maven_tag_archives(tag['id'], event_id=repo['create_event']):
+            # unfortunately pgdb does not appear to intern strings, but still
+            # better not to create any new ones
+            maven_build_index.setdefault(
+                archive['group_id'], {}).setdefault(
+                    archive['artifact_id'], {})[archive['version']] = archive['build_id']
 
         if not ignore:
             ignore = []
@@ -10404,14 +10414,13 @@ class HostExports(object):
                                                   'files': [fileinfo]}
             else:
                 build = get_build(dep, strict=True)
-                build_archives = list_archives(buildID=build['id'], type='maven')
-                tag_archives.extend(build_archives)
-        ignore.extend(task_deps.values())
+                for archive in list_archives(buildID=build['id'], type='maven'):
+                    maven_build_index.setdefault(
+                        archive['group_id'], {}).setdefault(
+                            archive['artifact_id'], {}).setdefault(
+                                archive['version'], archive['build_id'])
 
-        archives_by_label = {}
-        for archive in tag_archives:
-            maven_label = koji.mavenLabel(archive)
-            archives_by_label.setdefault(maven_label, {})[archive['filename']] = archive
+        ignore.extend(task_deps.values())
 
         SNAPSHOT_RE = re.compile(r'-\d{8}\.\d{6}-\d+')
         ignore_by_label = {}
@@ -10435,11 +10444,24 @@ class HostExports(object):
             maven_info = entry['maven_info']
             maven_label = koji.mavenLabel(maven_info)
             ignore_archives = ignore_by_label.get(maven_label, {})
-            label_archives = archives_by_label.get(maven_label, {})
+            build_id = maven_build_index.get(
+                        maven_info['group_id'], {}).get(
+                            maven_info['artifact_id'], {}).get(
+                                maven_info['version'])
+            if not build_id:
+                if not ignore_unknown:
+                    raise koji.BuildrootError, 'Unmatched maven g:a:v in build environment: ' \
+                        '%(group_id)s:%(artifact_id)s:%(version)s' % maven_info
+                build_archives = {}
+            else:
+                tinfo = dslice(maven_info, ['group_id', 'artifact_id', 'version'])
+                build_archives = list_archives(type='maven', typeInfo=tinfo)
+                # index by filename
+                build_archives = dict([(a['filename'], a) for a in build_archives])
 
             for fileinfo in entry['files']:
                 ignore_archive = ignore_archives.get(fileinfo['filename'])
-                tag_archive = label_archives.get(fileinfo['filename'])
+                tag_archive = build_archives.get(fileinfo['filename'])
                 if tag_archive and fileinfo['size'] == tag_archive['size']:
                     archives.append(tag_archive)
                 elif ignore_archive and fileinfo['size'] == ignore_archive['size']:
-- 
1.8.3.1

>From dcf97e335b1bf6108724c25fa90bcb338c673d6a Mon Sep 17 00:00:00 2001
From: Mike McLean <[email protected]>
Date: Tue, 29 Jul 2014 17:19:36 -0400
Subject: [PATCH 11/11] chunksize option for iterate()

---
 hub/kojihub.py | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/hub/kojihub.py b/hub/kojihub.py
index b825e47..467a48d 100644
--- a/hub/kojihub.py
+++ b/hub/kojihub.py
@@ -1286,7 +1286,7 @@ def readTaggedRPMS(tag, package=None, arch=None, event=None,inherit=False,latest
             else:
                 tags_seen[tagid] = 1
             query.values['tagid'] = tagid
-            for rpminfo in query.iterate():
+            for rpminfo in query.iterate(chunksize=5000):
                 #note: we're checking against the build list because
                 # it has been filtered by the package list. The tag
                 # tools should endeavor to keep tag_listing sane w.r.t.
@@ -2119,7 +2119,7 @@ def maven_tag_archives(tag_id, event_id=None, inherit=True):
         for tag_id in taglist:
             taginfo = get_tag(tag_id, strict=True, event=event_id)
             query.values['tag_id'] = tag_id
-            archives = query.iterate()
+            archives = query.iterate(chunksize=5000)
             for archive in archives:
                 pkg = packages.get(archive['pkg_id'])
                 if not pkg or pkg['blocked']:
@@ -6274,7 +6274,7 @@ class QueryProcessor(object):
         rowlock: if True, use "FOR UPDATE" to lock the queried rows
     """
 
-    iterchunksize = 1000
+    iterchunksize = 2000
 
     def __init__(self, columns=None, aliases=None, tables=None,
                  joins=None, clauses=None, values=None, opts=None):
@@ -6413,16 +6413,18 @@ SELECT %(col_str)s
             return _multiRow(query, self.values, (self.aliases or self.columns))
 
 
-    def iterate(self):
+    def iterate(self, chunksize=None):
+        if chunksize is None:
+            chunksize = self.iterchunksize
         if self.opts.get('countOnly'):
             return self.execute()
-        elif self.opts.get('limit') and self.opts['limit'] < self.iterchunksize:
+        elif self.opts.get('limit') and self.opts['limit'] < chunksize:
             return self.execute()
         else:
             fields = self.aliases or self.columns
             fields = list(fields)
             return self._iterate(str(self), self.values.copy(), fields,
-                                 self.iterchunksize, self.opts.get('asList'))
+                                    chunksize, self.opts.get('asList'))
 
     def _iterate(self, query, values, fields, chunksize, as_list=False):
         # We pass all this data into the generator so that the iterator works
-- 
1.8.3.1

--
buildsys mailing list
[email protected]
https://admin.fedoraproject.org/mailman/listinfo/buildsys

Reply via email to