commit: 5308afe0c9b7bbceec8c1cf7d31145afb24ef371
Author: Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Sat Jan 10 13:30:36 2026 +0000
Commit: Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Sat Jan 10 13:31:35 2026 +0000
URL:
https://gitweb.gentoo.org/proj/pkgcore/snakeoil.git/commit/?id=5308afe0
tools.imports: break out a resusable module ast walker
Signed-off-by: Brian Harring <ferringb <AT> gmail.com>
src/snakeoil/tools/imports.py | 97 +++++++++++++++++++++++++------------------
1 file changed, 57 insertions(+), 40 deletions(-)
diff --git a/src/snakeoil/tools/imports.py b/src/snakeoil/tools/imports.py
index 4388837..1d70d7f 100644
--- a/src/snakeoil/tools/imports.py
+++ b/src/snakeoil/tools/imports.py
@@ -223,6 +223,59 @@ class AttributeCollector(ast.NodeVisitor):
mod.accessed_by[parts[0]].add(self.current)
+class ModuleCollector:
+ __slots__ = ("ast_sources", "root")
+ ast_sources: dict[ModuleImport, tuple[Path, ast.Module]]
+ root: ModuleImport
+
+ def __init__(self):
+ self.root = ModuleImport(None, None, "")
+ self.ast_sources = {}
+
+ def add_namespace(self, namespace: str) -> list[ModuleImport]:
+ collected = []
+ # pre-initialize the module tree of what we care about.
+ for module in get_submodules_of(namespace, include_root=True):
+ obj = self.root.create(module.__name__.split("."))
+ obj.alls = getattr(module, "__all__", None)
+ p = Path(cast(str, module.__file__))
+ with p.open("r") as f:
+ self.ast_sources[obj] = (p, ast.parse(f.read(), str(p)))
+ collected.append(obj)
+ return collected
+
+ def finalize(self):
+ # collect and finalize imports, then run analysis based on attribute
access.
+
+ # Note: the import collection may need to run multiple times.
Consider:
+ # klass.py:
+ # __all__ = ('blah', 'foon')
+ # from .other import blah, foon
+ #
+ # If some other module tries to travers klass.py before those from
imports have been placed, the
+ # other module will think it stopped at an attribute for 'blah'.
Which isn't correct.
+ # They internally detect this conflict and mark a boolean to indicate
if a reprocessing is needed.
+ must_be_processed = list(self.ast_sources)
+ for run in range(0, 10):
+ for mod in must_be_processed:
+ p, tree = self.ast_sources[mod]
+ ImportCollector(self.root, mod, mod.qualname, p).visit(tree)
+
+ if new_reprocess := [
+ mod for mod in self.ast_sources if mod.requires_reprocessing
+ ]:
+ if len(new_reprocess) == len(must_be_processed):
+ raise Exception("cycle encountered")
+ must_be_processed = new_reprocess
+ else:
+ break
+
+ for mod, (p, tree) in self.ast_sources.items():
+ AttributeCollector(self.root, mod).visit(tree)
+
+
+# parser functionality goes below
+
parser = arghparse.ArgumentParser(
prog=__name__.rsplit(".", 1)[-1],
)
@@ -262,46 +315,10 @@ unused.add_argument(
@unused.bind_main_func
def main(options, out, err) -> int:
- root = ModuleImport(None, None, "")
-
- target_modules: set[ModuleImport] = set()
- ast_sources = {}
- # pre-initialize the module tree of what we care about.
- for target in tuple(options.consumers) + (options.target,):
- for module in get_submodules_of(target, include_root=True):
- obj = root.create(module.__name__.split("."))
- obj.alls = getattr(module, "__all__", None)
- p = Path(cast(str, module.__file__))
- with p.open("r") as f:
- ast_sources[obj] = (p, ast.parse(f.read(), str(p)))
- if target == options.target:
- target_modules.add(obj)
-
- # collect and finalize imports, then run analysis based on attribute
access.
-
- # Note: the import collection may need to run multiple times. Consider:
- # klass.py:
- # __all__ = ('blah', 'foon')
- # from .other import blah, foon
- #
- # If some other module tries to travers klass.py before those from imports
have been placed, the
- # other module will think it stopped at an attribute for 'blah'. Which
isn't correct.
- # They internally detect this conflict and mark a boolean to indicate if a
reprocessing is needed.
- must_be_processed = list(ast_sources)
- for run in range(0, 10):
- for mod in must_be_processed:
- p, tree = ast_sources[mod]
- ImportCollector(root, mod, mod.qualname, p).visit(tree)
-
- if new_reprocess := [mod for mod in ast_sources if
mod.requires_reprocessing]:
- if len(new_reprocess) == len(must_be_processed):
- raise Exception("cycle encountered")
- must_be_processed = new_reprocess
- else:
- break
-
- for mod, (p, tree) in ast_sources.items():
- AttributeCollector(root, mod).visit(tree)
+ collecter = ModuleCollector()
+ target_modules = collecter.add_namespace(options.target)
+ for consumer in options.consumers:
+ collecter.add_namespace(consumer)
results = []
for mod in sorted(target_modules, key=lambda x: x.qualname):