This is an automated email from the ASF dual-hosted git repository. striker pushed a commit to branch striker/speculative-actions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 736228dd1dc47aad98d4781132ffde2d7d3f0bfa Author: Sander Striker <[email protected]> AuthorDate: Tue Mar 17 18:15:48 2026 +0100 speculative actions: Cross-element ACTION overlays with fallback resolution Generator: accepts ac_service and artifactcache. Processes subactions in order, fetching ActionResults to track outputs. Generates ACTION overlays for intra-element (compile→link) and cross-element (dependency subaction outputs) dependencies. Indexes dependency sources alongside artifacts so that both SOURCE and ARTIFACT overlays are generated for the same file digest, enabling fallback resolution (SOURCE > ARTIFACT > ACTION). Instantiator: accepts ac_service. Resolves overlays with fallback — once a target digest is resolved by a higher-priority overlay, lower-priority overlays for the same target are skipped. Cross-element ACTION overlays fall back to action cache lookup when source_element is set. Generation queue: passes ac_service and artifactcache to generator. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- .../queues/speculativeactiongenerationqueue.py | 6 +- src/buildstream/_speculative_actions/generator.py | 227 +++++++++++++++++---- .../_speculative_actions/instantiator.py | 79 ++++++- tests/speculative_actions/test_generator_unit.py | 14 +- 4 files changed, 270 insertions(+), 56 deletions(-) diff --git a/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py index c7397c5cc..225cc6d93 100644 --- a/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py +++ b/src/buildstream/_scheduler/queues/speculativeactiongenerationqueue.py @@ -90,8 +90,12 @@ class SpeculativeActionGenerationQueue(Queue): dependencies = list(element._dependencies(_Scope.BUILD, recurse=False)) + # Get action cache service for ACTION overlay generation + casd = context.get_casd() + ac_service = casd.get_ac_service() if casd else None + # Generate overlays - generator = SpeculativeActionsGenerator(cas) + generator = SpeculativeActionsGenerator(cas, ac_service=ac_service, artifactcache=artifactcache) spec_actions = generator.generate_speculative_actions(element, subaction_digests, dependencies) if not spec_actions or not spec_actions.actions: diff --git a/src/buildstream/_speculative_actions/generator.py b/src/buildstream/_speculative_actions/generator.py index aaf326fd2..caf6c9ad3 100644 --- a/src/buildstream/_speculative_actions/generator.py +++ b/src/buildstream/_speculative_actions/generator.py @@ -22,9 +22,10 @@ Generates SpeculativeActions and artifact overlays after element builds. This module is responsible for: 1. Extracting subaction digests from ActionResult 2. Traversing action input trees to find all file digests -3. Resolving digests to their source elements (SOURCE > ARTIFACT priority) +3. Resolving digests to their source elements (SOURCE > ARTIFACT > ACTION priority) 4. Creating overlays for each digest 5. Generating artifact_overlays for the element's output files +6. Tracking inter-subaction output dependencies via ACTION overlays """ from typing import Dict, Tuple @@ -39,16 +40,24 @@ class SpeculativeActionsGenerator: builds. """ - def __init__(self, cas): + def __init__(self, cas, ac_service=None, artifactcache=None): """ Initialize the generator. Args: cas: The CAS cache for fetching actions and directories + ac_service: Optional ActionCache service stub for fetching + ActionResults of prior subactions (needed for ACTION overlays) + artifactcache: Optional artifact cache for loading dependency + SpeculativeActions (needed for cross-element ACTION overlays) """ self._cas = cas - # Cache for digest.hash -> (element, path, type) lookups - self._digest_cache: Dict[str, Tuple[str, str, str]] = {} + self._ac_service = ac_service + self._artifactcache = artifactcache + # Cache for digest.hash -> list of (element, path, type) lookups + # Multiple entries per digest enable fallback resolution: + # SOURCE overlays are tried first, then ARTIFACT, then ACTION. + self._digest_cache: Dict[str, list] = {} def generate_speculative_actions(self, element, subaction_digests, dependencies): """ @@ -69,28 +78,139 @@ class SpeculativeActionsGenerator: - artifact_overlays: Overlays mapping artifact file digests to sources """ from .._protos.buildstream.v2 import speculative_actions_pb2 + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 spec_actions = speculative_actions_pb2.SpeculativeActions() # Build digest lookup tables from element sources and dependencies self._build_digest_cache(element, dependencies) + # Track outputs from prior subactions for ACTION overlay generation + # Maps file_digest_hash -> (source_element, producing_action_digest, output_path) + prior_outputs = {} + + # Seed prior_outputs with dependency subaction outputs for + # cross-element ACTION overlays. Dependencies have already been + # built and had their generation queue run, so their SAs and + # ActionResults are available. + if self._ac_service and self._artifactcache: + self._seed_dependency_outputs(dependencies, prior_outputs) + # Generate overlays for each subaction for subaction_digest in subaction_digests: spec_action = self._generate_action_overlays(element, subaction_digest) + + # Generate ACTION overlays for digests that match prior subaction outputs + # but weren't already resolved as SOURCE or ARTIFACT + if self._ac_service and prior_outputs: + action = self._cas.fetch_action(subaction_digest) + if action: + input_digests = self._extract_digests_from_action(action) + # Collect hashes already covered by SOURCE/ARTIFACT overlays + already_overlaid = set() + if spec_action: + for overlay in spec_action.overlays: + already_overlaid.add(overlay.target_digest.hash) + + for digest_hash, digest_size in input_digests: + if digest_hash in prior_outputs and digest_hash not in already_overlaid: + source_element, producing_action_digest, output_path = prior_outputs[digest_hash] + # Create ACTION overlay + if spec_action is None: + spec_action = speculative_actions_pb2.SpeculativeActions.SpeculativeAction() + spec_action.base_action_digest.CopyFrom(subaction_digest) + overlay = speculative_actions_pb2.SpeculativeActions.Overlay() + overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.ACTION + overlay.source_element = source_element + overlay.source_action_digest.CopyFrom(producing_action_digest) + overlay.source_path = output_path + overlay.target_digest.hash = digest_hash + overlay.target_digest.size_bytes = digest_size + spec_action.overlays.append(overlay) + if spec_action: spec_actions.actions.append(spec_action) + # Fetch this subaction's ActionResult and record its outputs + # for subsequent subactions + if self._ac_service: + self._record_subaction_outputs(subaction_digest, prior_outputs) + # Generate artifact overlays for the element's output files artifact_overlays = self._generate_artifact_overlays(element) spec_actions.artifact_overlays.extend(artifact_overlays) return spec_actions + def _record_subaction_outputs(self, action_digest, prior_outputs, source_element=""): + """ + Fetch a subaction's ActionResult from the action cache and record + its output file digests for subsequent subaction ACTION overlay generation. + + Args: + action_digest: The action digest to look up (stored on ACTION overlays) + prior_outputs: Dict to update with file_digest_hash -> (source_element, action_digest, path) + source_element: Element name for cross-element overlays ("" = same element) + """ + try: + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + request = remote_execution_pb2.GetActionResultRequest( + action_digest=action_digest, + ) + action_result = self._ac_service.GetActionResult(request) + if action_result: + for output_file in action_result.output_files: + prior_outputs[output_file.digest.hash] = ( + source_element, action_digest, output_file.path + ) + except Exception: + pass + + def _seed_dependency_outputs(self, dependencies, prior_outputs): + """ + Seed prior_outputs with subaction outputs from dependency elements. + + For each dependency that has stored SpeculativeActions, fetch the + ActionResult of each subaction and record its output files. This + enables cross-element ACTION overlays: if the current element's + subaction input tree contains a file that was produced by a + dependency's subaction, the overlay will reference it. + + Args: + dependencies: List of dependency elements + prior_outputs: Dict to seed with file_digest_hash -> + (source_element, action_digest, path) + """ + for dep in dependencies: + try: + if not dep._cached(): + continue + + artifact = dep._get_artifact() + if not artifact or not artifact.cached(): + continue + + dep_sa = self._artifactcache.get_speculative_actions(artifact) + if not dep_sa: + continue + + for spec_action in dep_sa.actions: + self._record_subaction_outputs( + spec_action.base_action_digest, + prior_outputs, + source_element=dep.name, + ) + except Exception: + pass + def _build_digest_cache(self, element, dependencies): """ Build a cache mapping file digests to their source elements. + Multiple entries per digest are stored to enable fallback + resolution at instantiation time (SOURCE > ARTIFACT > ACTION). + Args: element: The element being processed dependencies: List of dependency elements @@ -100,7 +220,14 @@ class SpeculativeActionsGenerator: # Index element's own sources (highest priority) self._index_element_sources(element, element) - # Index dependency artifacts (lower priority) + # Index dependency sources — enables SOURCE overlays for dep + # files (e.g. headers) that exist in both source and artifact. + # At instantiation, SOURCE is tried first; if the dep's sources + # aren't fetched (dep not rebuilding), ARTIFACT is used instead. + for dep in dependencies: + self._index_element_sources(dep, dep) + + # Index dependency artifacts for dep in dependencies: self._index_element_artifact(dep) @@ -186,13 +313,13 @@ class SpeculativeActionsGenerator: # Build full relative path file_path = file_node.name if not current_path else f"{current_path}/{file_node.name}" - # Priority: SOURCE > ARTIFACT - # Only store if not already present, or if upgrading from ARTIFACT to SOURCE + entry = (element_name, file_path, overlay_type) if digest_hash not in self._digest_cache: - self._digest_cache[digest_hash] = (element_name, file_path, overlay_type) - elif overlay_type == "SOURCE" and self._digest_cache[digest_hash][2] == "ARTIFACT": - # Upgrade ARTIFACT to SOURCE - self._digest_cache[digest_hash] = (element_name, file_path, overlay_type) + self._digest_cache[digest_hash] = [entry] + else: + # Avoid duplicate (same element, same path, same type) + if entry not in self._digest_cache[digest_hash]: + self._digest_cache[digest_hash].append(entry) # Recursively traverse subdirectories for dir_node in directory.directories: @@ -227,11 +354,11 @@ class SpeculativeActionsGenerator: # Extract all file digests from the action's input tree input_digests = self._extract_digests_from_action(action) - # Resolve each digest to an overlay + # Resolve each digest to overlays (may produce multiple per digest + # for fallback resolution: SOURCE > ARTIFACT) for digest in input_digests: - overlay = self._resolve_digest_to_overlay(digest, element) - if overlay: - spec_action.overlays.append(overlay) + overlays = self._resolve_digest_to_overlays(digest, element) + spec_action.overlays.extend(overlays) return spec_action if spec_action.overlays else None @@ -279,47 +406,56 @@ class SpeculativeActionsGenerator: except: pass - def _resolve_digest_to_overlay(self, digest_tuple, element, artifact_file_path=None): + def _resolve_digest_to_overlays(self, digest_tuple, element): """ - Resolve a file digest to an Overlay proto. + Resolve a file digest to Overlay protos. + + Returns multiple overlays when the same digest appears in both + source and artifact trees, enabling fallback resolution at + instantiation time (SOURCE tried first, then ARTIFACT). Args: digest_tuple: Tuple of (hash, size_bytes) element: The element being processed - artifact_file_path: Path in artifact (used for artifact_overlays), can differ from source_path Returns: - Overlay proto or None if digest cannot be resolved + List of Overlay protos (SOURCE first, then ARTIFACT), or empty list """ from .._protos.buildstream.v2 import speculative_actions_pb2 - from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 digest_hash = digest_tuple[0] digest_size = digest_tuple[1] - # Look up in our digest cache - if digest_hash not in self._digest_cache: - return None + entries = self._digest_cache.get(digest_hash) + if not entries: + return [] - element_name, file_path, overlay_type = self._digest_cache[digest_hash] - - # Create overlay - overlay = speculative_actions_pb2.SpeculativeActions.Overlay() - overlay.target_digest.hash = digest_hash - overlay.target_digest.size_bytes = digest_size - overlay.source_path = file_path # Path in the source/artifact where it originated - - if overlay_type == "SOURCE": - overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE - # Empty string means self-reference for this element - overlay.source_element = "" if element_name == element.name else element_name - elif overlay_type == "ARTIFACT": - overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT - overlay.source_element = element_name - else: - return None + overlays = [] + for element_name, file_path, overlay_type in entries: + overlay = speculative_actions_pb2.SpeculativeActions.Overlay() + overlay.target_digest.hash = digest_hash + overlay.target_digest.size_bytes = digest_size + overlay.source_path = file_path + + if overlay_type == "SOURCE": + overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE + overlay.source_element = "" if element_name == element.name else element_name + elif overlay_type == "ARTIFACT": + overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT + overlay.source_element = element_name + else: + continue + + overlays.append(overlay) + + # Sort: SOURCE first, then ARTIFACT — instantiator tries in order + type_priority = { + speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE: 0, + speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT: 1, + } + overlays.sort(key=lambda o: type_priority.get(o.type, 99)) - return overlay + return overlays def _generate_artifact_overlays(self, element): """ @@ -378,11 +514,12 @@ class SpeculativeActionsGenerator: # Process each file with full path for file_node in directory.files: file_path = file_node.name if not current_path else f"{current_path}/{file_node.name}" - overlay = self._resolve_digest_to_overlay( - (file_node.digest.hash, file_node.digest.size_bytes), element, file_path + resolved = self._resolve_digest_to_overlays( + (file_node.digest.hash, file_node.digest.size_bytes), element ) - if overlay: - overlays.append(overlay) + # For artifact_overlays, take the highest-priority overlay + if resolved: + overlays.append(resolved[0]) # Recursively process subdirectories for dir_node in directory.directories: diff --git a/src/buildstream/_speculative_actions/instantiator.py b/src/buildstream/_speculative_actions/instantiator.py index 45c907199..abd5b9bbd 100644 --- a/src/buildstream/_speculative_actions/instantiator.py +++ b/src/buildstream/_speculative_actions/instantiator.py @@ -37,18 +37,21 @@ class SpeculativeActionInstantiator: dependency versions by replacing file digests according to overlays. """ - def __init__(self, cas, artifactcache): + def __init__(self, cas, artifactcache, ac_service=None): """ Initialize the instantiator. Args: cas: The CAS cache artifactcache: The artifact cache + ac_service: Optional ActionCache service stub for resolving + cross-element ACTION overlays """ self._cas = cas self._artifactcache = artifactcache + self._ac_service = ac_service - def instantiate_action(self, spec_action, element, element_lookup): + def instantiate_action(self, spec_action, element, element_lookup, action_outputs=None): """ Instantiate a SpeculativeAction by applying overlays. @@ -56,6 +59,8 @@ class SpeculativeActionInstantiator: spec_action: SpeculativeAction proto element: Element being primed element_lookup: Dict mapping element names to Element objects + action_outputs: Optional dict of (subaction_index_str, output_path) -> new_digest + for resolving ACTION overlays from prior subaction executions Returns: Digest of instantiated action, or None if overlays cannot be applied @@ -81,14 +86,22 @@ class SpeculativeActionInstantiator: skipped_count = 0 applied_count = 0 - # Resolve all overlays first + # Resolve overlays with fallback. Multiple overlays may target + # the same digest (e.g. SOURCE + ARTIFACT for the same dep file). + # They are stored in priority order (SOURCE first); once a target + # is resolved, subsequent overlays for it are skipped. for overlay in spec_action.overlays: + # Skip if this target was already resolved by a higher-priority overlay + if overlay.target_digest.hash in digest_replacements: + continue + # Optimization: Skip overlays for dependencies with unchanged cache keys + # (only applies to SOURCE/ARTIFACT overlays with a source_element) if overlay.source_element and self._should_skip_overlay(overlay, element, cached_dep_keys): skipped_count += 1 continue - replacement = self._resolve_overlay(overlay, element, element_lookup) + replacement = self._resolve_overlay(overlay, element, element_lookup, action_outputs=action_outputs) if replacement: # replacement is (old_digest, new_digest) digest_replacements[replacement[0].hash] = replacement[1] @@ -156,6 +169,13 @@ class SpeculativeActionInstantiator: Returns: bool: True if overlay can be skipped """ + from .._protos.buildstream.v2 import speculative_actions_pb2 + + # Never skip ACTION overlays via this optimization — they use + # subaction indices, not element names + if overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.ACTION: + return False + # Only skip for dependency overlays (source_element is not empty and not self) if not overlay.source_element or overlay.source_element == element.name: return False @@ -178,7 +198,7 @@ class SpeculativeActionInstantiator: return False - def _resolve_overlay(self, overlay, element, element_lookup): + def _resolve_overlay(self, overlay, element, element_lookup, action_outputs=None): """ Resolve an overlay to get current file digest. @@ -186,6 +206,8 @@ class SpeculativeActionInstantiator: overlay: Overlay proto element: Current element element_lookup: Dict mapping element names to Element objects + action_outputs: Optional dict of (subaction_index_str, output_path) -> new_digest + for resolving ACTION overlays Returns: Tuple of (old_digest, new_digest) or None @@ -196,6 +218,8 @@ class SpeculativeActionInstantiator: return self._resolve_source_overlay(overlay, element, element_lookup) elif overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT: return self._resolve_artifact_overlay(overlay, element, element_lookup) + elif overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.ACTION: + return self._resolve_action_overlay(overlay, action_outputs) return None @@ -296,6 +320,51 @@ class SpeculativeActionInstantiator: return None + def _resolve_action_overlay(self, overlay, action_outputs): + """ + Resolve an ACTION overlay using outputs from prior subaction executions. + + For intra-element overlays (source_element == ""), uses the + action_outputs dict populated during sequential priming. + + For cross-element overlays (source_element set), falls back to + the action cache — the dependency's subaction may have been + executed during the dependency's own priming or build. + + Args: + overlay: Overlay proto with type ACTION + action_outputs: Dict of (base_action_digest_hash, output_path) -> new_digest + + Returns: + Tuple of (old_digest, new_digest) or None + """ + key = (overlay.source_action_digest.hash, overlay.source_path) + + # Check action_outputs first (intra-element, populated during priming) + if action_outputs: + new_digest = action_outputs.get(key) + if new_digest: + return (overlay.target_digest, new_digest) + + # For cross-element: look up the producing subaction's ActionResult + # from the action cache directly + if overlay.source_element and self._ac_service: + try: + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + request = remote_execution_pb2.GetActionResultRequest( + action_digest=overlay.source_action_digest, + ) + action_result = self._ac_service.GetActionResult(request) + if action_result: + for output_file in action_result.output_files: + if output_file.path == overlay.source_path: + return (overlay.target_digest, output_file.digest) + except Exception: + pass + + return None + def _find_file_by_path(self, directory_digest, file_path): """ Find a file in a directory tree by full relative path. diff --git a/tests/speculative_actions/test_generator_unit.py b/tests/speculative_actions/test_generator_unit.py index db5aa3b24..67d4b34b2 100644 --- a/tests/speculative_actions/test_generator_unit.py +++ b/tests/speculative_actions/test_generator_unit.py @@ -285,12 +285,14 @@ class TestGeneratorOverlayProduction: assert speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT in overlay_types def test_source_priority_over_artifact(self): - """When same digest exists in both source and artifact, SOURCE wins.""" + """When same digest exists in both source and artifact, both overlays + are generated with SOURCE first for fallback resolution.""" from buildstream._speculative_actions.generator import SpeculativeActionsGenerator cas = FakeCAS() shared_content = b'shared-file-content' + shared_hash = _make_digest(shared_content).hash # Create element sources with the shared file source_root = _build_source_tree(cas, { @@ -318,10 +320,12 @@ class TestGeneratorOverlayProduction: assert len(spec_actions.actions) == 1 action = spec_actions.actions[0] - # The overlay should be SOURCE (higher priority) - for overlay in action.overlays: - if overlay.target_digest.hash == _make_digest(shared_content).hash: - assert overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE + # Both SOURCE and ARTIFACT overlays should be generated for the + # same target digest, with SOURCE first for priority resolution + matching = [o for o in action.overlays if o.target_digest.hash == shared_hash] + assert len(matching) >= 2 + assert matching[0].type == speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE + assert matching[1].type == speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT def test_no_overlays_for_unknown_digests(self): """Digests not found in sources or artifacts should produce no overlays."""
