commit:     d800d224ab38c0f524d3fe858ebe201cbfa903c1
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Nov  6 08:33:03 2014 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Dec  7 23:10:48 2014 +0000
URL:        
http://sources.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=d800d224

Log changes between vdb_metadata.pickle updates

This adds add support to generate a vdb_metadata_delta.json file
which tracks package merges / unmerges that occur between updates to
vdb_metadata.pickle. IndexedVardb can use the delta together with
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
so that it can avoid expensive listdir calls in /var/db/pkg/*.
Note that vdb_metadata.pickle is only updated periodically, in
order to avoid excessive re-writes of a large file.

In order to test the performance gains from this patch, you need to
generate /var/cache/edb/vdb_metadata_delta.json first, which will
happen automatically if you run 'emerge -p anything' with root
privileges.

---
 pym/portage/dbapi/IndexedVardb.py      |  22 ++++-
 pym/portage/dbapi/_VdbMetadataDelta.py | 153 +++++++++++++++++++++++++++++++++
 pym/portage/dbapi/vartree.py           |  42 ++++++---
 3 files changed, 206 insertions(+), 11 deletions(-)

diff --git a/pym/portage/dbapi/IndexedVardb.py 
b/pym/portage/dbapi/IndexedVardb.py
index 424defc..38bfeed 100644
--- a/pym/portage/dbapi/IndexedVardb.py
+++ b/pym/portage/dbapi/IndexedVardb.py
@@ -3,6 +3,7 @@
 
 import portage
 from portage.dep import Atom
+from portage.exception import InvalidData
 from portage.versions import _pkg_str
 
 class IndexedVardb(object):
@@ -42,7 +43,26 @@ class IndexedVardb(object):
                if self._cp_map is not None:
                        return iter(sorted(self._cp_map))
 
-               return self._iter_cp_all()
+               delta_data = self._vardb._cache_delta.loadRace()
+               if delta_data is None:
+                       return self._iter_cp_all()
+
+               self._vardb._cache_delta.applyDelta(delta_data)
+
+               self._cp_map = cp_map = {}
+               for cpv in self._vardb._aux_cache["packages"]:
+                       try:
+                               cpv = _pkg_str(cpv)
+                       except InvalidData:
+                               continue
+
+                       cp_list = cp_map.get(cpv.cp)
+                       if cp_list is None:
+                               cp_list = []
+                               cp_map[cpv.cp] = cp_list
+                       cp_list.append(cpv)
+
+               return iter(sorted(self._cp_map))
 
        def _iter_cp_all(self):
                self._cp_map = cp_map = {}

diff --git a/pym/portage/dbapi/_VdbMetadataDelta.py 
b/pym/portage/dbapi/_VdbMetadataDelta.py
new file mode 100644
index 0000000..3e3ff18
--- /dev/null
+++ b/pym/portage/dbapi/_VdbMetadataDelta.py
@@ -0,0 +1,153 @@
+# Copyright 2014 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import io
+import json
+import os
+
+from portage import _encodings
+from portage.util import atomic_ofstream
+
+class VdbMetadataDelta(object):
+
+       _format_version  = "1"
+
+       def __init__(self, vardb):
+               self._vardb = vardb
+
+       def initialize(self, timestamp):
+               f = atomic_ofstream(self._vardb._cache_delta_filename, 'w',
+                       encoding=_encodings['repo.content'], errors='strict')
+               json.dump({
+                       "version": self._format_version,
+                       "timestamp": timestamp
+                       }, f, ensure_ascii=False)
+               f.close()
+
+       def load(self):
+
+               if not os.path.exists(self._vardb._aux_cache_filename):
+                       # If the primary cache doesn't exist yet, then
+                       # we can't record a delta against it.
+                       return None
+
+               try:
+                       with io.open(self._vardb._cache_delta_filename, 'r',
+                               encoding=_encodings['repo.content'],
+                               errors='strict') as f:
+                               cache_obj = json.load(f)
+               except EnvironmentError as e:
+                       if e.errno not in (errno.ENOENT, errno.ESTALE):
+                               raise
+               except (SystemExit, KeyboardInterrupt):
+                       raise
+               except Exception:
+                       # Corrupt, or not json format.
+                       pass
+               else:
+                       try:
+                               version = cache_obj["version"]
+                       except KeyError:
+                               pass
+                       else:
+                               # Verify that the format version is compatible,
+                               # since a newer version of portage may have
+                               # written an incompatible file.
+                               if version == self._format_version:
+                                       try:
+                                               deltas = cache_obj["deltas"]
+                                       except KeyError:
+                                               cache_obj["deltas"] = deltas = 
[]
+
+                                       if isinstance(deltas, list):
+                                               return cache_obj
+
+               return None
+
+       def loadRace(self):
+               """
+               This calls self.load() and validates the timestamp
+               against the currently loaded self._vardb._aux_cache. If a
+               concurrent update causes the timestamps to be inconsistent,
+               then it reloads the caches and tries one more time before
+               it aborts. In practice, the race is very unlikely, so
+               this will usually succeed on the first try.
+               """
+
+               tries = 2
+               while tries:
+                       tries -= 1
+                       cache_delta = self.load()
+                       if cache_delta is not None and \
+                               cache_delta.get("timestamp") != \
+                               self._vardb._aux_cache.get("timestamp", False):
+                               self._vardb._aux_cache_obj = None
+                       else:
+                               return cache_delta
+
+               return None
+
+       def recordEvent(self, event, cpv, slot, counter):
+
+               self._vardb.lock()
+               try:
+                       deltas_obj = self.load()
+
+                       if deltas_obj is None:
+                               # We can't record meaningful deltas without
+                               # a pre-existing state.
+                               return
+
+                       delta_node = {
+                               "event": event,
+                               "package": cpv.cp,
+                               "version": cpv.version,
+                               "slot": slot,
+                               "counter": "%s" % counter
+                       }
+
+                       deltas_obj["deltas"].append(delta_node)
+
+                       # Eliminate earlier nodes cancelled out by later nodes
+                       # that have identical package and slot attributes.
+                       filtered_list = []
+                       slot_keys = set()
+                       version_keys = set()
+                       for delta_node in reversed(deltas_obj["deltas"]):
+                               slot_key = (delta_node["package"],
+                                       delta_node["slot"])
+                               version_key = (delta_node["package"],
+                                       delta_node["version"])
+                               if not (slot_key in slot_keys or \
+                                       version_key in version_keys):
+                                       filtered_list.append(delta_node)
+                                       slot_keys.add(slot_key)
+                                       version_keys.add(version_key)
+
+                       filtered_list.reverse()
+                       deltas_obj["deltas"] = filtered_list
+
+                       f = atomic_ofstream(self._vardb._cache_delta_filename,
+                               mode='w', encoding=_encodings['repo.content'])
+                       json.dump(deltas_obj, f, ensure_ascii=False)
+                       f.close()
+
+               finally:
+                       self._vardb.unlock()
+
+       def applyDelta(self, data):
+               packages = self._vardb._aux_cache["packages"]
+               for delta in data["deltas"]:
+                       cpv = delta["package"] + "-" + delta["version"]
+                       event = delta["event"]
+                       if event == "add":
+                               # Use aux_get to populate the cache
+                               # for this cpv.
+                               if cpv not in packages:
+                                       try:
+                                               self._vardb.aux_get(cpv, 
["DESCRIPTION"])
+                                       except KeyError:
+                                               pass
+                       elif event == "remove":
+                               packages.pop(cpv, None)

diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 9c8b276..2d4d32d 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -64,6 +64,7 @@ from portage import _os_merge
 from portage import _selinux_merge
 from portage import _unicode_decode
 from portage import _unicode_encode
+from ._VdbMetadataDelta import VdbMetadataDelta
 
 from _emerge.EbuildBuildDir import EbuildBuildDir
 from _emerge.EbuildPhase import EbuildPhase
@@ -179,6 +180,9 @@ class vardbapi(dbapi):
                self._aux_cache_obj = None
                self._aux_cache_filename = os.path.join(self._eroot,
                        CACHE_PATH, "vdb_metadata.pickle")
+               self._cache_delta_filename = os.path.join(self._eroot,
+                       CACHE_PATH, "vdb_metadata_delta.json")
+               self._cache_delta = VdbMetadataDelta(self)
                self._counter_path = os.path.join(self._eroot,
                        CACHE_PATH, "counter")
 
@@ -569,22 +573,31 @@ class vardbapi(dbapi):
                long as at least part of the cache is still valid)."""
                if self._flush_cache_enabled and \
                        self._aux_cache is not None and \
-                       len(self._aux_cache["modified"]) >= 
self._aux_cache_threshold and \
-                       secpass >= 2:
+                       secpass >= 2 and \
+                       (len(self._aux_cache["modified"]) >= 
self._aux_cache_threshold or
+                       not os.path.exists(self._cache_delta_filename)):
+
+                       ensure_dirs(os.path.dirname(self._aux_cache_filename))
+
                        self._owners.populate() # index any unindexed contents
                        valid_nodes = set(self.cpv_all())
                        for cpv in list(self._aux_cache["packages"]):
                                if cpv not in valid_nodes:
                                        del self._aux_cache["packages"][cpv]
                        del self._aux_cache["modified"]
-                       try:
-                               f = atomic_ofstream(self._aux_cache_filename, 
'wb')
-                               pickle.dump(self._aux_cache, f, protocol=2)
-                               f.close()
-                               apply_secpass_permissions(
-                                       self._aux_cache_filename, 
gid=portage_gid, mode=0o644)
-                       except (IOError, OSError) as e:
-                               pass
+                       timestamp = time.time()
+                       self._aux_cache["timestamp"] = timestamp
+
+                       f = atomic_ofstream(self._aux_cache_filename, 'wb')
+                       pickle.dump(self._aux_cache, f, protocol=2)
+                       f.close()
+                       apply_secpass_permissions(
+                               self._aux_cache_filename, mode=0o644)
+
+                       self._cache_delta.initialize(timestamp)
+                       apply_secpass_permissions(
+                               self._cache_delta_filename, mode=0o644)
+
                        self._aux_cache["modified"] = set()
 
        @property
@@ -1622,6 +1635,13 @@ class dblink(object):
                                self.dbdir, noiselevel=-1)
                        return
 
+               if self.dbdir is self.dbpkgdir:
+                       counter, = self.vartree.dbapi.aux_get(
+                               self.mycpv, ["COUNTER"])
+                       self.vartree.dbapi._cache_delta.recordEvent(
+                               "remove", self.mycpv,
+                               self.settings["SLOT"].split("/")[0], counter)
+
                shutil.rmtree(self.dbdir)
                # If empty, remove parent category directory.
                try:
@@ -4232,6 +4252,8 @@ class dblink(object):
                        self.delete()
                        _movefile(self.dbtmpdir, self.dbpkgdir, 
mysettings=self.settings)
                        self._merged_path(self.dbpkgdir, 
os.lstat(self.dbpkgdir))
+                       self.vartree.dbapi._cache_delta.recordEvent(
+                               "add", self.mycpv, slot, counter)
                finally:
                        self.unlockdb()
 

Reply via email to