commit: d800d224ab38c0f524d3fe858ebe201cbfa903c1
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Nov 6 08:33:03 2014 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Dec 7 23:10:48 2014 +0000
URL:
http://sources.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=d800d224
Log changes between vdb_metadata.pickle updates
This adds add support to generate a vdb_metadata_delta.json file
which tracks package merges / unmerges that occur between updates to
vdb_metadata.pickle. IndexedVardb can use the delta together with
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
so that it can avoid expensive listdir calls in /var/db/pkg/*.
Note that vdb_metadata.pickle is only updated periodically, in
order to avoid excessive re-writes of a large file.
In order to test the performance gains from this patch, you need to
generate /var/cache/edb/vdb_metadata_delta.json first, which will
happen automatically if you run 'emerge -p anything' with root
privileges.
---
pym/portage/dbapi/IndexedVardb.py | 22 ++++-
pym/portage/dbapi/_VdbMetadataDelta.py | 153 +++++++++++++++++++++++++++++++++
pym/portage/dbapi/vartree.py | 42 ++++++---
3 files changed, 206 insertions(+), 11 deletions(-)
diff --git a/pym/portage/dbapi/IndexedVardb.py
b/pym/portage/dbapi/IndexedVardb.py
index 424defc..38bfeed 100644
--- a/pym/portage/dbapi/IndexedVardb.py
+++ b/pym/portage/dbapi/IndexedVardb.py
@@ -3,6 +3,7 @@
import portage
from portage.dep import Atom
+from portage.exception import InvalidData
from portage.versions import _pkg_str
class IndexedVardb(object):
@@ -42,7 +43,26 @@ class IndexedVardb(object):
if self._cp_map is not None:
return iter(sorted(self._cp_map))
- return self._iter_cp_all()
+ delta_data = self._vardb._cache_delta.loadRace()
+ if delta_data is None:
+ return self._iter_cp_all()
+
+ self._vardb._cache_delta.applyDelta(delta_data)
+
+ self._cp_map = cp_map = {}
+ for cpv in self._vardb._aux_cache["packages"]:
+ try:
+ cpv = _pkg_str(cpv)
+ except InvalidData:
+ continue
+
+ cp_list = cp_map.get(cpv.cp)
+ if cp_list is None:
+ cp_list = []
+ cp_map[cpv.cp] = cp_list
+ cp_list.append(cpv)
+
+ return iter(sorted(self._cp_map))
def _iter_cp_all(self):
self._cp_map = cp_map = {}
diff --git a/pym/portage/dbapi/_VdbMetadataDelta.py
b/pym/portage/dbapi/_VdbMetadataDelta.py
new file mode 100644
index 0000000..3e3ff18
--- /dev/null
+++ b/pym/portage/dbapi/_VdbMetadataDelta.py
@@ -0,0 +1,153 @@
+# Copyright 2014 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import io
+import json
+import os
+
+from portage import _encodings
+from portage.util import atomic_ofstream
+
+class VdbMetadataDelta(object):
+
+ _format_version = "1"
+
+ def __init__(self, vardb):
+ self._vardb = vardb
+
+ def initialize(self, timestamp):
+ f = atomic_ofstream(self._vardb._cache_delta_filename, 'w',
+ encoding=_encodings['repo.content'], errors='strict')
+ json.dump({
+ "version": self._format_version,
+ "timestamp": timestamp
+ }, f, ensure_ascii=False)
+ f.close()
+
+ def load(self):
+
+ if not os.path.exists(self._vardb._aux_cache_filename):
+ # If the primary cache doesn't exist yet, then
+ # we can't record a delta against it.
+ return None
+
+ try:
+ with io.open(self._vardb._cache_delta_filename, 'r',
+ encoding=_encodings['repo.content'],
+ errors='strict') as f:
+ cache_obj = json.load(f)
+ except EnvironmentError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ raise
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception:
+ # Corrupt, or not json format.
+ pass
+ else:
+ try:
+ version = cache_obj["version"]
+ except KeyError:
+ pass
+ else:
+ # Verify that the format version is compatible,
+ # since a newer version of portage may have
+ # written an incompatible file.
+ if version == self._format_version:
+ try:
+ deltas = cache_obj["deltas"]
+ except KeyError:
+ cache_obj["deltas"] = deltas =
[]
+
+ if isinstance(deltas, list):
+ return cache_obj
+
+ return None
+
+ def loadRace(self):
+ """
+ This calls self.load() and validates the timestamp
+ against the currently loaded self._vardb._aux_cache. If a
+ concurrent update causes the timestamps to be inconsistent,
+ then it reloads the caches and tries one more time before
+ it aborts. In practice, the race is very unlikely, so
+ this will usually succeed on the first try.
+ """
+
+ tries = 2
+ while tries:
+ tries -= 1
+ cache_delta = self.load()
+ if cache_delta is not None and \
+ cache_delta.get("timestamp") != \
+ self._vardb._aux_cache.get("timestamp", False):
+ self._vardb._aux_cache_obj = None
+ else:
+ return cache_delta
+
+ return None
+
+ def recordEvent(self, event, cpv, slot, counter):
+
+ self._vardb.lock()
+ try:
+ deltas_obj = self.load()
+
+ if deltas_obj is None:
+ # We can't record meaningful deltas without
+ # a pre-existing state.
+ return
+
+ delta_node = {
+ "event": event,
+ "package": cpv.cp,
+ "version": cpv.version,
+ "slot": slot,
+ "counter": "%s" % counter
+ }
+
+ deltas_obj["deltas"].append(delta_node)
+
+ # Eliminate earlier nodes cancelled out by later nodes
+ # that have identical package and slot attributes.
+ filtered_list = []
+ slot_keys = set()
+ version_keys = set()
+ for delta_node in reversed(deltas_obj["deltas"]):
+ slot_key = (delta_node["package"],
+ delta_node["slot"])
+ version_key = (delta_node["package"],
+ delta_node["version"])
+ if not (slot_key in slot_keys or \
+ version_key in version_keys):
+ filtered_list.append(delta_node)
+ slot_keys.add(slot_key)
+ version_keys.add(version_key)
+
+ filtered_list.reverse()
+ deltas_obj["deltas"] = filtered_list
+
+ f = atomic_ofstream(self._vardb._cache_delta_filename,
+ mode='w', encoding=_encodings['repo.content'])
+ json.dump(deltas_obj, f, ensure_ascii=False)
+ f.close()
+
+ finally:
+ self._vardb.unlock()
+
+ def applyDelta(self, data):
+ packages = self._vardb._aux_cache["packages"]
+ for delta in data["deltas"]:
+ cpv = delta["package"] + "-" + delta["version"]
+ event = delta["event"]
+ if event == "add":
+ # Use aux_get to populate the cache
+ # for this cpv.
+ if cpv not in packages:
+ try:
+ self._vardb.aux_get(cpv,
["DESCRIPTION"])
+ except KeyError:
+ pass
+ elif event == "remove":
+ packages.pop(cpv, None)
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 9c8b276..2d4d32d 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -64,6 +64,7 @@ from portage import _os_merge
from portage import _selinux_merge
from portage import _unicode_decode
from portage import _unicode_encode
+from ._VdbMetadataDelta import VdbMetadataDelta
from _emerge.EbuildBuildDir import EbuildBuildDir
from _emerge.EbuildPhase import EbuildPhase
@@ -179,6 +180,9 @@ class vardbapi(dbapi):
self._aux_cache_obj = None
self._aux_cache_filename = os.path.join(self._eroot,
CACHE_PATH, "vdb_metadata.pickle")
+ self._cache_delta_filename = os.path.join(self._eroot,
+ CACHE_PATH, "vdb_metadata_delta.json")
+ self._cache_delta = VdbMetadataDelta(self)
self._counter_path = os.path.join(self._eroot,
CACHE_PATH, "counter")
@@ -569,22 +573,31 @@ class vardbapi(dbapi):
long as at least part of the cache is still valid)."""
if self._flush_cache_enabled and \
self._aux_cache is not None and \
- len(self._aux_cache["modified"]) >=
self._aux_cache_threshold and \
- secpass >= 2:
+ secpass >= 2 and \
+ (len(self._aux_cache["modified"]) >=
self._aux_cache_threshold or
+ not os.path.exists(self._cache_delta_filename)):
+
+ ensure_dirs(os.path.dirname(self._aux_cache_filename))
+
self._owners.populate() # index any unindexed contents
valid_nodes = set(self.cpv_all())
for cpv in list(self._aux_cache["packages"]):
if cpv not in valid_nodes:
del self._aux_cache["packages"][cpv]
del self._aux_cache["modified"]
- try:
- f = atomic_ofstream(self._aux_cache_filename,
'wb')
- pickle.dump(self._aux_cache, f, protocol=2)
- f.close()
- apply_secpass_permissions(
- self._aux_cache_filename,
gid=portage_gid, mode=0o644)
- except (IOError, OSError) as e:
- pass
+ timestamp = time.time()
+ self._aux_cache["timestamp"] = timestamp
+
+ f = atomic_ofstream(self._aux_cache_filename, 'wb')
+ pickle.dump(self._aux_cache, f, protocol=2)
+ f.close()
+ apply_secpass_permissions(
+ self._aux_cache_filename, mode=0o644)
+
+ self._cache_delta.initialize(timestamp)
+ apply_secpass_permissions(
+ self._cache_delta_filename, mode=0o644)
+
self._aux_cache["modified"] = set()
@property
@@ -1622,6 +1635,13 @@ class dblink(object):
self.dbdir, noiselevel=-1)
return
+ if self.dbdir is self.dbpkgdir:
+ counter, = self.vartree.dbapi.aux_get(
+ self.mycpv, ["COUNTER"])
+ self.vartree.dbapi._cache_delta.recordEvent(
+ "remove", self.mycpv,
+ self.settings["SLOT"].split("/")[0], counter)
+
shutil.rmtree(self.dbdir)
# If empty, remove parent category directory.
try:
@@ -4232,6 +4252,8 @@ class dblink(object):
self.delete()
_movefile(self.dbtmpdir, self.dbpkgdir,
mysettings=self.settings)
self._merged_path(self.dbpkgdir,
os.lstat(self.dbpkgdir))
+ self.vartree.dbapi._cache_delta.recordEvent(
+ "add", self.mycpv, slot, counter)
finally:
self.unlockdb()