
I didn't know someone else was working on this. I am sorry if I am
Here is my patch.


From 659479c85c91a32e6c645549766b98e4bdcc9717 Mon Sep 17 00:00:00 2001
From: Orestis Ioannou <ores...@oioannou.com>
Date: Thu, 5 Mar 2015 09:03:09 +0100
Subject: [PATCH] Models: Refactor models to contain only ORM abstraction

Closes: #762934
Moved the static queries of each model and non ORM objects to query.py
Created a test suite to verify the Queries.
 debsources/app/views.py          |  18 +-
 debsources/models.py             | 379 +-------------------------------------
 debsources/query.py              | 387 +++++++++++++++++++++++++++++++++++++++
 debsources/tests/test_queries.py |  69 +++++++
 4 files changed, 467 insertions(+), 386 deletions(-)
 create mode 100644 debsources/query.py
 create mode 100644 debsources/tests/test_queries.py

diff --git a/debsources/app/views.py b/debsources/app/views.py
index 129d4ad..e32c536 100644
--- a/debsources/app/views.py
+++ b/debsources/app/views.py
@@ -36,8 +36,10 @@ from debsources.excepts import (
     InvalidPackageOrVersionError, FileOrFolderNotFound,
     Http500Error, Http404Error, Http404ErrorSuggestions, Http403Error)
 from debsources.models import (
-    Ctag, Package, PackageName, Checksum, Location, Directory,
-    SourceFile, File, Suite)
+    Ctag, Package, PackageName, Checksum, File, Suite)
+from debsources.query import (Location, Directory,
+    SourceFile, Queries)
 from debsources.app.sourcecode import SourceCodeIterator
 from debsources.app.forms import SearchForm
 from debsources.app.infobox import Infobox
@@ -62,7 +64,7 @@ def skeleton_variables():
     update_ts_file = os.path.join(app.config['CACHE_DIR'], 'last-update')
     last_update = local_info.read_update_ts(update_ts_file)
-    packages_prefixes = PackageName.get_packages_prefixes(
+    packages_prefixes = Queries.get_packages_prefixes(
     credits_file = os.path.join(app.config["LOCAL_DIR"], "credits.html")
@@ -144,7 +146,7 @@ def deal_404_error(error, mode='html'):
         if isinstance(error, Http404ErrorSuggestions):
             # let's suggest all the possible locations with a different
             # package version
-            possible_versions = PackageName.list_versions(
+            possible_versions = Queries.list_versions(
                 session, error.package)
             suggestions = ['/'.join(filter(None,
                                     [error.package, v.version, error.path]))
@@ -441,7 +443,7 @@ class PrefixView(GeneralView):
         suite = suite.lower()
         if suite == "all":
             suite = ""
-        if prefix in PackageName.get_packages_prefixes(
+        if prefix in Queries.get_packages_prefixes(
                 if not suite:
@@ -501,7 +503,7 @@ class SourceView(GeneralView):
             suite = ""
         # we list the version with suites it belongs to
-            versions_w_suites = PackageName.list_versions_w_suites(
+            versions_w_suites = Queries.list_versions_w_suites(
                 session, packagename, suite)
         except InvalidPackageOrVersionError:
             raise Http404Error("%s not found" % packagename)
@@ -605,7 +607,7 @@ class SourceView(GeneralView):
         when 'latest' is provided instead of a version number
-            versions = PackageName.list_versions(session, package)
+            versions = Queries.list_versions(session, package)
         except InvalidPackageOrVersionError:
             raise Http404Error("%s not found" % package)
         # the latest version is the latest item in the
@@ -876,7 +878,7 @@ class CtagView(GeneralView):
             pagination = None
             slice_ = None
-        (count, results) = Ctag.find_ctag(session, ctag, slice_=slice_,
+        (count, results) = Queries.find_ctag(session, ctag, slice_=slice_,
         if not self.all_:
             pagination = Pagination(page, offset, count)
diff --git a/debsources/models.py b/debsources/models.py
index e4d05fc..5f57246 100644
--- a/debsources/models.py
+++ b/debsources/models.py
@@ -16,30 +16,17 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-import os
-import magic
-import stat
-from collections import namedtuple
 from sqlalchemy import Column, ForeignKey
 from sqlalchemy import UniqueConstraint, PrimaryKeyConstraint
 from sqlalchemy import Index
 from sqlalchemy import Boolean, Date, DateTime, Integer, LargeBinary, String
 from sqlalchemy import Enum
-from sqlalchemy import and_
-from sqlalchemy import func as sql_func
 from sqlalchemy.orm import relationship
 from sqlalchemy.ext.declarative import declarative_base
-from debian.debian_support import version_compare
-from debsources.excepts import InvalidPackageOrVersionError, \
-    FileOrFolderNotFound
 from debsources.consts import VCS_TYPES, SLOCCOUNT_LANGUAGES, \
-from debsources import filetype
-from debsources.debmirror import SourcePackage
-from debsources.consts import SUITES
 Base = declarative_base()
@@ -64,77 +51,6 @@ class PackageName(Base):
     def __repr__(self):
         return self.name
-    @staticmethod
-    def get_packages_prefixes(cache_dir):
-        """
-        returns the packages prefixes (a, b, ..., liba, libb, ..., y, z)
-        cache_dir: the cache directory, usually comes from the app config
-        """
-        try:
-            with open(os.path.join(cache_dir, 'pkg-prefixes')) as f:
-                prefixes = [l.rstrip() for l in f]
-        except IOError:
-            prefixes = PREFIXES_DEFAULT
-        return prefixes
-    @staticmethod
-    def list_versions(session, packagename, suite=""):
-        """
-        return all versions of a packagename. if suite is specified, only
-        versions contained in that suite are returned.
-        """
-        try:
-            name_id = session.query(PackageName) \
-                             .filter(PackageName.name == packagename) \
-                             .first().id
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-        try:
-            if not suite:
-                versions = session.query(Package) \
-                                  .filter(Package.name_id == name_id).all()
-            else:
-                versions = (session.query(Package)
-                                   .filter(Package.name_id == name_id)
-                                   .filter(sql_func.lower(Suite.suite)
-                                           == suite)
-                                   .filter(Suite.package_id == Package.id)
-                                   .all())
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-        # we sort the versions according to debian versions rules
-        versions = sorted(versions, cmp=version_compare)
-        return versions
-    @staticmethod
-    def list_versions_w_suites(session, packagename, suite=""):
-        """
-        return versions with suites. if suite is provided, then only return
-        versions contained in that suite.
-        """
-        # FIXME a left outer join on (Package, Suite) is more preferred.
-        # However, per https://stackoverflow.com/a/997467, custom aggregation
-        # function to concatenate the suite names for the group_by should be
-        # defined on database connection level.
-        versions = PackageName.list_versions(session, packagename, suite)
-        versions_w_suites = []
-        try:
-            for v in versions:
-                suites = session.query(Suite) \
-                                .filter(Suite.package_id == v.id) \
-                                .all()
-                # sort the suites according to debsources.consts.SUITES
-                # use keyfunc to make it py3 compatible
-                suites.sort(key=lambda s: SUITES['all'].index(s.suite))
-                suites = [s.suite for s in suites]
-                v = v.to_dict()
-                v['suites'] = suites
-                versions_w_suites.append(v)
-        except Exception:
-            raise InvalidPackageOrVersionError(packagename)
-        return versions_w_suites
     def to_dict(self):
         simply serializes a package (because SQLAlchemy query results
@@ -351,41 +267,6 @@ class Ctag(Base):
     #                .filter(Ctag.tag in ctags)
     #                .filter(Ctag
-    @staticmethod
-    def find_ctag(session, ctag, package=None, slice_=None):
-        """
-        Returns places in the code where a ctag is found.
-             tuple (count, [sliced] results)
-        session: an SQLAlchemy session
-        ctag: the ctag to search
-        package: limit results to package
-        """
-        results = (session.query(PackageName.name.label("package"),
-                                 Package.version.label("version"),
-                                 Ctag.file_id.label("file_id"),
-                                 File.path.label("path"),
-                                 Ctag.line.label("line"))
-                   .filter(Ctag.tag == ctag)
-                   .filter(Ctag.package_id == Package.id)
-                   .filter(Ctag.file_id == File.id)
-                   .filter(Package.name_id == PackageName.id)
-                   )
-        if package is not None:
-            results = results.filter(PackageName.name == package)
-        results = results.order_by(Ctag.package_id, File.path)
-        count = results.count()
-        if slice_ is not None:
-            results = results.slice(slice_[0], slice_[1])
-        results = [dict(package=res.package,
-                        version=res.version,
-                        path=res.path,
-                        line=res.line)
-                   for res in results.all()]
-        return (count, results)
 class Metric(Base):
     __tablename__ = 'metrics'
@@ -477,261 +358,3 @@ class HistorySlocCount(Base):
     def __init__(self, suite, timestamp):
         self.suite = suite
         self.timestamp = timestamp
-# it's used in Location.get_stat
-# to bypass flake8 complaints, we do not inject the global namespace
-# with globals()["LongFMT"] = namedtuple...
-LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"])
-class Location(object):
-    """ a location in a package, can be a directory or a file """
-    def _get_debian_path(self, session, package, version, sources_dir):
-        """
-        Returns the Debian path of a package version.
-        For example: main/h
-                     contrib/libz
-        It's the path of a *version*, since a package can have multiple
-        versions in multiple areas (ie main/contrib/nonfree).
-        sources_dir: the sources directory, usually comes from the app config
-        """
-        prefix = SourcePackage.pkg_prefix(package)
-        try:
-            p_id = session.query(PackageName) \
-                          .filter(PackageName.name == package).first().id
-            varea = session.query(Package) \
-                           .filter(and_(Package.name_id == p_id,
-                                        Package.version == version)) \
-                           .first().area
-        except:
-            # the package or version doesn't exist in the database
-            # BUT: packages are stored for a longer time in the filesystem
-            # to allow codesearch.d.n and others less up-to-date platforms
-            # to point here.
-            # Problem: we don't know the area of such a package
-            # so we try in main, contrib and non-free.
-            for area in AREAS:
-                if os.path.exists(os.path.join(sources_dir, area,
-                                               prefix, package, version)):
-                    return os.path.join(area, prefix)
-            raise InvalidPackageOrVersionError("%s %s" % (package, version))
-        return os.path.join(varea, prefix)
-    def __init__(self, session, sources_dir, sources_static,
-                 package, version="", path=""):
-        """ initialises useful attributes """
-        debian_path = self._get_debian_path(session,
-                                            package, version, sources_dir)
-        self.package = package
-        self.version = version
-        self.path = path
-        self.path_to = os.path.join(package, version, path)
-        self.sources_path = os.path.join(
-            sources_dir,
-            debian_path,
-            self.path_to)
-        self.version_path = os.path.join(
-            sources_dir,
-            debian_path,
-            package,
-            version)
-        if not(os.path.exists(self.sources_path)):
-            raise FileOrFolderNotFound("%s" % (self.path_to))
-        self.sources_path_static = os.path.join(
-            sources_static,
-            debian_path,
-            self.path_to)
-    def is_dir(self):
-        """ True if self is a directory, False if it's not """
-        return os.path.isdir(self.sources_path)
-    def is_file(self):
-        """ True if sels is a file, False if it's not """
-        return os.path.isfile(self.sources_path)
-    def is_symlink(self):
-        """ True if a folder/file is a symbolic link file, False if it's not
-        """
-        return os.path.islink(self.sources_path)
-    def get_package(self):
-        return self.package
-    def get_version(self):
-        return self.version
-    def get_path(self):
-        return self.path
-    def get_deepest_element(self):
-        if self.version == "":
-            return self.package
-        elif self.path == "":
-            return self.version
-        else:
-            return self.path.split("/")[-1]
-    def get_path_to(self):
-        return self.path_to.rstrip("/")
-    @staticmethod
-    def get_stat(sources_path):
-        """
-        Returns the filetype and permissions of the folder/file
-        on the disk, unix-styled.
-        """
-        # When porting to Python3, use stat.filemode directly
-        sources_stat = os.lstat(sources_path)
-        sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size
-        perm_flags = [
-            (stat.S_IRUSR, "r", "-"),
-            (stat.S_IWUSR, "w", "-"),
-            (stat.S_IXUSR, "x", "-"),
-            (stat.S_IRGRP, "r", "-"),
-            (stat.S_IWGRP, "w", "-"),
-            (stat.S_IXGRP, "x", "-"),
-            (stat.S_IROTH, "r", "-"),
-            (stat.S_IWOTH, "w", "-"),
-            (stat.S_IXOTH, "x", "-"),
-            ]
-        # XXX these flags should be enough.
-        type_flags = [
-            (stat.S_ISLNK, "l"),
-            (stat.S_ISREG, "-"),
-            (stat.S_ISDIR, "d"),
-            ]
-        # add the file type: d/l/-
-        file_type = " "
-        for ft, sign in type_flags:
-            if ft(sources_mode):
-                file_type = sign
-                break
-        file_perms = ""
-        for (flag, do_true, do_false) in perm_flags:
-            file_perms += do_true if (sources_mode & flag) else do_false
-        file_size = sources_size
-        symlink_dest = None
-        if file_type == "l":
-            symlink_dest = os.readlink(sources_path)
-        return vars(LongFMT(file_type, file_perms, file_size, symlink_dest))
-    @staticmethod
-    def get_path_links(endpoint, path_to):
-        """
-        returns the path hierarchy with urls, to use with 'You are here:'
-        [(name, url(name)), (...), ...]
-        """
-        path_dict = path_to.split('/')
-        pathl = []
-        # we import flask here, in order to permit the use of this module
-        # without requiring the user to have flask (e.g. bin/debsources-update
-        # can run in another machine without flask, because it doesn't use
-        # this method)
-        from flask import url_for
-        for (i, p) in enumerate(path_dict):
-            pathl.append((p, url_for(endpoint,
-                                     path_to='/'.join(path_dict[:i+1]))))
-        return pathl
-class Directory(object):
-    """ a folder in a package """
-    def __init__(self, location, toplevel=False):
-        # if the directory is a toplevel one, we remove the .pc folder
-        self.sources_path = location.sources_path
-        self.toplevel = toplevel
-        self.location = location
-    def get_listing(self):
-        """
-        returns the list of folders/files in a directory,
-        along with their type (directory/file)
-        in a tuple (name, type)
-        """
-        def get_type(f):
-            if os.path.isdir(os.path.join(self.sources_path, f)):
-                return "directory"
-            else:
-                return "file"
-        get_stat, join_path = self.location.get_stat, os.path.join
-        listing = sorted(dict(name=f, type=get_type(f),
-                              stat=get_stat(join_path(self.sources_path, f)))
-                         for f in os.listdir(self.sources_path))
-        if self.toplevel:
-            listing = filter(lambda x: x['name'] != ".pc", listing)
-        return listing
-class SourceFile(object):
-    """ a source file in a package """
-    def __init__(self, location):
-        self.location = location
-        self.sources_path = location.sources_path
-        self.sources_path_static = location.sources_path_static
-        self.mime = self._find_mime()
-    def _find_mime(self):
-        """ returns the mime encoding and type of a file """
-        mime = magic.open(magic.MIME_TYPE)
-        mime.load()
-        type_ = mime.file(self.sources_path)
-        mime.close()
-        mime = magic.open(magic.MIME_ENCODING)
-        mime.load()
-        encoding = mime.file(self.sources_path)
-        mime.close()
-        return dict(encoding=encoding, type=type_)
-    def get_mime(self):
-        return self.mime
-    def get_sha256sum(self, session):
-        """
-        Queries the DB and returns the shasum of the file.
-        """
-        shasum = session.query(Checksum.sha256) \
-                        .filter(Checksum.package_id == Package.id) \
-                        .filter(Package.name_id == PackageName.id) \
-                        .filter(File.id == Checksum.file_id) \
-                        .filter(PackageName.name == self.location.package) \
-                        .filter(Package.version == self.location.version) \
-                        .filter(File.path == str(self.location.path)) \
-                        .first()
-        # WARNING: in the DB path is binary, and here
-        # location.path is unicode, because the path comes from
-        # the URL. TODO: check with non-unicode paths
-        if shasum:
-            shasum = shasum[0]
-        return shasum
-    def istextfile(self):
-        """True if self is a text file, False if it's not.
-        """
-        return filetype.is_text_file(self.mime['type'])
-        # for substring in text_file_mimes:
-        #     if substring in self.mime['type']:
-        #         return True
-        # return False
-    def get_raw_url(self):
-        """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """
-        return self.sources_path_static
diff --git a/debsources/query.py b/debsources/query.py
new file mode 100644
index 0000000..b8ee557
--- /dev/null
+++ b/debsources/query.py
@@ -0,0 +1,387 @@
+import os
+import stat
+from collections import namedtuple
+from debian.debian_support import version_compare
+from debsources import filetype
+from debsources.consts import AREAS, PREFIXES_DEFAULT
+from debsources.consts import SUITES
+from debsources.debmirror import SourcePackage
+from debsources.excepts import FileOrFolderNotFound, \
+    InvalidPackageOrVersionError
+from debsources.models import (
+    Checksum, Ctag, File, Package, PackageName, Suite)
+import magic
+from sqlalchemy import and_
+from sqlalchemy import func as sql_func
+LongFMT = namedtuple("LongFMT", ["type", "perms", "size", "symlink_dest"])
+class Location(object):
+    """ a location in a package, can be a directory or a file """
+    def _get_debian_path(self, session, package, version, sources_dir):
+        """
+        Returns the Debian path of a package version.
+        For example: main/h
+                     contrib/libz
+        It's the path of a *version*, since a package can have multiple
+        versions in multiple areas (ie main/contrib/nonfree).
+        sources_dir: the sources directory, usually comes from the app config
+        """
+        prefix = SourcePackage.pkg_prefix(package)
+        try:
+            p_id = session.query(PackageName) \
+                          .filter(PackageName.name == package).first().id
+            varea = session.query(Package) \
+                           .filter(and_(Package.name_id == p_id,
+                                        Package.version == version)) \
+                           .first().area
+        except:
+            # the package or version doesn't exist in the database
+            # BUT: packages are stored for a longer time in the filesystem
+            # to allow codesearch.d.n and others less up-to-date platforms
+            # to point here.
+            # Problem: we don't know the area of such a package
+            # so we try in main, contrib and non-free.
+            for area in AREAS:
+                if os.path.exists(os.path.join(sources_dir, area,
+                                               prefix, package, version)):
+                    return os.path.join(area, prefix)
+            raise InvalidPackageOrVersionError("%s %s" % (package, version))
+        return os.path.join(varea, prefix)
+    def __init__(self, session, sources_dir, sources_static,
+                 package, version="", path=""):
+        """ initialises useful attributes """
+        debian_path = self._get_debian_path(session,
+                                            package, version, sources_dir)
+        self.package = package
+        self.version = version
+        self.path = path
+        self.path_to = os.path.join(package, version, path)
+        self.sources_path = os.path.join(
+            sources_dir,
+            debian_path,
+            self.path_to)
+        self.version_path = os.path.join(
+            sources_dir,
+            debian_path,
+            package,
+            version)
+        if not(os.path.exists(self.sources_path)):
+            raise FileOrFolderNotFound("%s" % (self.path_to))
+        self.sources_path_static = os.path.join(
+            sources_static,
+            debian_path,
+            self.path_to)
+    def is_dir(self):
+        """ True if self is a directory, False if it's not """
+        return os.path.isdir(self.sources_path)
+    def is_file(self):
+        """ True if sels is a file, False if it's not """
+        return os.path.isfile(self.sources_path)
+    def is_symlink(self):
+        """ True if a folder/file is a symbolic link file, False if it's not
+        """
+        return os.path.islink(self.sources_path)
+    def get_package(self):
+        return self.package
+    def get_version(self):
+        return self.version
+    def get_path(self):
+        return self.path
+    def get_deepest_element(self):
+        if self.version == "":
+            return self.package
+        elif self.path == "":
+            return self.version
+        else:
+            return self.path.split("/")[-1]
+    def get_path_to(self):
+        return self.path_to.rstrip("/")
+    @staticmethod
+    def get_stat(sources_path):
+        """
+        Returns the filetype and permissions of the folder/file
+        on the disk, unix-styled.
+        """
+        # When porting to Python3, use stat.filemode directly
+        sources_stat = os.lstat(sources_path)
+        sources_mode, sources_size = sources_stat.st_mode, sources_stat.st_size
+        perm_flags = [
+            (stat.S_IRUSR, "r", "-"),
+            (stat.S_IWUSR, "w", "-"),
+            (stat.S_IXUSR, "x", "-"),
+            (stat.S_IRGRP, "r", "-"),
+            (stat.S_IWGRP, "w", "-"),
+            (stat.S_IXGRP, "x", "-"),
+            (stat.S_IROTH, "r", "-"),
+            (stat.S_IWOTH, "w", "-"),
+            (stat.S_IXOTH, "x", "-"),
+            ]
+        # XXX these flags should be enough.
+        type_flags = [
+            (stat.S_ISLNK, "l"),
+            (stat.S_ISREG, "-"),
+            (stat.S_ISDIR, "d"),
+            ]
+        # add the file type: d/l/-
+        file_type = " "
+        for ft, sign in type_flags:
+            if ft(sources_mode):
+                file_type = sign
+                break
+        file_perms = ""
+        for (flag, do_true, do_false) in perm_flags:
+            file_perms += do_true if (sources_mode & flag) else do_false
+        file_size = sources_size
+        symlink_dest = None
+        if file_type == "l":
+            symlink_dest = os.readlink(sources_path)
+        return vars(LongFMT(file_type, file_perms, file_size, symlink_dest))
+    @staticmethod
+    def get_path_links(endpoint, path_to):
+        """
+        returns the path hierarchy with urls, to use with 'You are here:'
+        [(name, url(name)), (...), ...]
+        """
+        path_dict = path_to.split('/')
+        pathl = []
+        # we import flask here, in order to permit the use of this module
+        # without requiring the user to have flask (e.g. bin/debsources-update
+        # can run in another machine without flask, because it doesn't use
+        # this method)
+        from flask import url_for
+        for (i, p) in enumerate(path_dict):
+            pathl.append((p, url_for(endpoint,
+                                     path_to='/'.join(path_dict[:i+1]))))
+        return pathl
+class Directory(object):
+    """ a folder in a package """
+    def __init__(self, location, toplevel=False):
+        # if the directory is a toplevel one, we remove the .pc folder
+        self.sources_path = location.sources_path
+        self.toplevel = toplevel
+        self.location = location
+    def get_listing(self):
+        """
+        returns the list of folders/files in a directory,
+        along with their type (directory/file)
+        in a tuple (name, type)
+        """
+        def get_type(f):
+            if os.path.isdir(os.path.join(self.sources_path, f)):
+                return "directory"
+            else:
+                return "file"
+        get_stat, join_path = self.location.get_stat, os.path.join
+        listing = sorted(dict(name=f, type=get_type(f),
+                              stat=get_stat(join_path(self.sources_path, f)))
+                         for f in os.listdir(self.sources_path))
+        if self.toplevel:
+            listing = filter(lambda x: x['name'] != ".pc", listing)
+        return listing
+class SourceFile(object):
+    """ a source file in a package """
+    def __init__(self, location):
+        self.location = location
+        self.sources_path = location.sources_path
+        self.sources_path_static = location.sources_path_static
+        self.mime = self._find_mime()
+    def _find_mime(self):
+        """ returns the mime encoding and type of a file """
+        mime = magic.open(magic.MIME_TYPE)
+        mime.load()
+        type_ = mime.file(self.sources_path)
+        mime.close()
+        mime = magic.open(magic.MIME_ENCODING)
+        mime.load()
+        encoding = mime.file(self.sources_path)
+        mime.close()
+        return dict(encoding=encoding, type=type_)
+    def get_mime(self):
+        return self.mime
+    def get_sha256sum(self, session):
+        """
+        Queries the DB and returns the shasum of the file.
+        """
+        shasum = session.query(Checksum.sha256) \
+                        .filter(Checksum.package_id == Package.id) \
+                        .filter(Package.name_id == PackageName.id) \
+                        .filter(File.id == Checksum.file_id) \
+                        .filter(PackageName.name == self.location.package) \
+                        .filter(Package.version == self.location.version) \
+                        .filter(File.path == str(self.location.path)) \
+                        .first()
+        # WARNING: in the DB path is binary, and here
+        # location.path is unicode, because the path comes from
+        # the URL. TODO: check with non-unicode paths
+        if shasum:
+            shasum = shasum[0]
+        return shasum
+    def istextfile(self):
+        """True if self is a text file, False if it's not.
+        """
+        return filetype.is_text_file(self.mime['type'])
+        # for substring in text_file_mimes:
+        #     if substring in self.mime['type']:
+        #         return True
+        # return False
+    def get_raw_url(self):
+        """ return the raw url on disk (e.g. data/main/a/azerty/foo.bar) """
+        return self.sources_path_static
+class Queries(object):
+    @staticmethod
+    def get_packages_prefixes(cache_dir):
+        """
+        returns the packages prefixes (a, b, ..., liba, libb, ..., y, z)
+        cache_dir: the cache directory, usually comes from the app config
+        """
+        try:
+            with open(os.path.join(cache_dir, 'pkg-prefixes')) as f:
+                prefixes = [l.rstrip() for l in f]
+        except IOError:
+            prefixes = PREFIXES_DEFAULT
+        return prefixes
+    @staticmethod
+    def list_versions(session, packagename, suite=""):
+        """
+        return all versions of a packagename. if suite is specified, only
+        versions contained in that suite are returned.
+        """
+        try:
+            name_id = session.query(PackageName) \
+                             .filter(PackageName.name == packagename) \
+                             .first().id
+        except Exception:
+            raise InvalidPackageOrVersionError(packagename)
+        try:
+            if not suite:
+                versions = session.query(Package) \
+                                  .filter(Package.name_id == name_id).all()
+            else:
+                versions = (session.query(Package)
+                                   .filter(Package.name_id == name_id)
+                                   .filter(sql_func.lower(Suite.suite)
+                                           == suite)
+                                   .filter(Suite.package_id == Package.id)
+                                   .all())
+        except Exception:
+            raise InvalidPackageOrVersionError(packagename)
+        # we sort the versions according to debian versions rules
+        versions = sorted(versions, cmp=version_compare)
+        return versions
+    @staticmethod
+    def list_versions_w_suites(session, packagename, suite=""):
+        """
+        return versions with suites. if suite is provided, then only return
+        versions contained in that suite.
+        """
+        # FIXME a left outer join on (Package, Suite) is more preferred.
+        # However, per https://stackoverflow.com/a/997467, custom aggregation
+        # function to concatenate the suite names for the group_by should be
+        # defined on database connection level.
+        versions = Queries.list_versions(session, packagename, suite)
+        versions_w_suites = []
+        try:
+            for v in versions:
+                suites = session.query(Suite) \
+                                .filter(Suite.package_id == v.id) \
+                                .all()
+                # sort the suites according to debsources.consts.SUITES
+                # use keyfunc to make it py3 compatible
+                suites.sort(key=lambda s: SUITES['all'].index(s.suite))
+                suites = [s.suite for s in suites]
+                v = v.to_dict()
+                v['suites'] = suites
+                versions_w_suites.append(v)
+        except Exception:
+            raise InvalidPackageOrVersionError(packagename)
+        return versions_w_suites
+    @staticmethod
+    def find_ctag(session, ctag, package=None, slice_=None):
+        """
+        Returns places in the code where a ctag is found.
+             tuple (count, [sliced] results)
+        session: an SQLAlchemy session
+        ctag: the ctag to search
+        package: limit results to package
+        """
+        results = (session.query(PackageName.name.label("package"),
+                                 Package.version.label("version"),
+                                 Ctag.file_id.label("file_id"),
+                                 File.path.label("path"),
+                                 Ctag.line.label("line"))
+                   .filter(Ctag.tag == ctag)
+                   .filter(Ctag.package_id == Package.id)
+                   .filter(Ctag.file_id == File.id)
+                   .filter(Package.name_id == PackageName.id)
+                   )
+        if package is not None:
+            results = results.filter(PackageName.name == package)
+        results = results.order_by(Ctag.package_id, File.path)
+        count = results.count()
+        if slice_ is not None:
+            results = results.slice(slice_[0], slice_[1])
+        results = [dict(package=res.package,
+                        version=res.version,
+                        path=res.path,
+                        line=res.line)
+                   for res in results.all()]
+        return (count, results)
diff --git a/debsources/tests/test_queries.py b/debsources/tests/test_queries.py
new file mode 100644
index 0000000..0fb7094
--- /dev/null
+++ b/debsources/tests/test_queries.py
@@ -0,0 +1,69 @@
+import unittest
+from nose.plugins.attrib import attr
+from debsources.query import Queries
+from debsources.tests.db_testing import DbTestFixture
+from debsources.tests.testdata import TEST_DB_NAME
+class QueriesTest(unittest.TestCase, DbTestFixture):
+    @classmethod
+    def setUpClass(cls):
+        cls.db_setup_cls()
+        # creates an app object, which is used to run queries
+        from debsources.app import app_wrapper
+        # erases a few configuration parameters needed for testing:
+        uri = "postgresql:///" + TEST_DB_NAME
+        app_wrapper.app.config["SQLALCHEMY_DATABASE_URI"] = uri
+        app_wrapper.app.config['LIST_OFFSET'] = 5
+        app_wrapper.app.testing = True
+        app_wrapper.go()
+        cls.app = app_wrapper.app.test_client()
+        cls.app_wrapper = app_wrapper
+    @classmethod
+    def tearDownClass(cls):
+        cls.app_wrapper.engine.dispose()
+        cls.db_teardown_cls()
+    def test_packages_prefixes(self):
+        self.assertEqual(Queries.get_packages_prefixes(
+            self.app_wrapper.app.config["CACHE_DIR"]),
+            ['b', 'd', 'f', 'g', 'l', 'libc', 'm', 'n', 'o', 's', 'u'])
+    def test_list_versions(self):
+        # Test without suit
+        packages = Queries.list_versions(self.session, "gnubg")
+        self.assertEqual([p.version for p in packages],
+                         ["0.90+20091206-4", "0.90+20120429-1", "1.02.000-2"])
+        # Test with suit
+        packages = Queries.list_versions(self.session, "gnubg", "wheezy")
+        self.assertEqual([p.version for p in packages], ["0.90+20120429-1"])
+        # Test returning suites without suit as parameter
+        self.assertTrue({'suites': [u'wheezy'], 'version': u'0.90+20120429-1',
+                         'area': u'main'} in
+                        Queries.list_versions_w_suites(self.session, "gnubg"))
+        # Test returning suites with a suit as parameter
+        self.assertEqual(Queries.list_versions_w_suites(self.session, "gnubg", "jessie"),
+                        [{'suites': [u'jessie', u'sid'],
+                        'version': u'1.02.000-2', 'area': u'main'}])
+    def test_find_ctag(self):
+        self.assertEqual(Queries.find_ctag(self.session, "swap")[0], 8)
+        ctags = Queries.find_ctag(self.session, "swap", "gnubg")
+        self.assertEqual(ctags[0], 5)
+        self.assertTrue({'path': 'eval.c', 'line': 1747,
+                        'version': u'0.90+20091206-4', 'package': u'gnubg'}
+                        in ctags[1])

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to