This is an automated email from the ASF dual-hosted git repository.

striker pushed a commit to branch striker/speculative-actions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 78d6fd178d365e0b0b8c0fe0f1e8578baabe94cc
Author: Sander Striker <[email protected]>
AuthorDate: Tue Mar 17 18:16:21 2026 +0100

    speculative actions: Concurrent priming with PENDING state and per-dep 
callbacks
    
    Rearchitect the priming queue to run concurrently with building by
    using the PENDING state pattern. Elements with stored SpeculativeActions
    but unbuilt dependencies enter the priming queue as PENDING instead of
    READY, holding them while background priming runs in the scheduler's
    thread pool.
    
    Element: new _set_build_dep_cached_callback fires each time a build
    dependency becomes cached (unlike _set_buildable_callback which fires
    only when ALL deps are cached). Enables incremental overlay resolution
    as dependencies complete one by one.
    
    Priming queue lifecycle:
    - PENDING: background priming fires immediately via run_in_executor,
      submitting independent subactions fire-and-forget
    - Per-dep callback: as each dep completes, re-attempts overlay
      resolution for newly available ARTIFACT/ACTION overlays
    - READY (buildable): final pass resolves remaining ACTION overlays
      (producing subactions now in AC), submits remaining
    - Done: element proceeds to BuildQueue with all actions primed
    
    Unchanged actions (instantiated digest equals base digest) skip
    submission — already in the action cache from the previous build.
    
    The Execute submission reads the first stream response to confirm
    acceptance by casd, then drops the stream. The action executes
    asynchronously and its result appears in the action cache.
    
    Architecture docs updated for the concurrent priming design, overlay
    fallback resolution, and data availability considerations.
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 doc/source/arch_speculative_actions.rst            | 129 +++++---
 .../queues/speculativecacheprimingqueue.py         | 339 ++++++++++++++++-----
 src/buildstream/element.py                         |  23 ++
 .../project/elements/speculative/app-chained.bst   |  35 +++
 .../project/elements/speculative/slow-dep.bst      |  19 ++
 .../files/speculative/slow-dep-files/slow.txt      |   1 +
 tests/integration/speculative_actions.py           | 188 +++++++++++-
 7 files changed, 601 insertions(+), 133 deletions(-)

diff --git a/doc/source/arch_speculative_actions.rst 
b/doc/source/arch_speculative_actions.rst
index d39d31baa..2f5b672a6 100644
--- a/doc/source/arch_speculative_actions.rst
+++ b/doc/source/arch_speculative_actions.rst
@@ -77,12 +77,28 @@ element with subaction digests:
    input tree to find all file digests. Each digest that matches the
    cache produces an ``Overlay`` recording:
 
-   - The overlay type (SOURCE or ARTIFACT)
-   - The source element name
+   - The overlay type (SOURCE, ARTIFACT, or ACTION)
+   - The source element name (or producing action's base digest hash
+     for ACTION overlays)
    - The file path within the source/artifact tree
    - The target digest to replace
 
-3. Stores the ``SpeculativeActions`` proto on the artifact, which is
+3. Generates **ACTION overlays** for inter-subaction dependencies, both
+   within the element and across dependency elements:
+
+   - **Intra-element**: subactions are processed in order; after each,
+     the generator fetches its ``ActionResult`` to learn what it produced.
+     Later subactions whose input digests match get ACTION overlays
+     (e.g., link's ``main.o`` linked to the compile that produced it).
+   - **Cross-element**: for each dependency with stored ``SpeculativeActions``,
+     the generator fetches ActionResults of the dependency's subactions
+     and seeds the output map.  If the current element's subaction input
+     contains an intermediate file produced by a dependency's subaction
+     (not in the artifact — those are ARTIFACT overlays), a cross-element
+     ACTION overlay is created with ``source_element`` set to the
+     dependency name.
+
+4. Stores the ``SpeculativeActions`` proto on the artifact, which is
    saved under both the strong and weak cache keys.
 
 
@@ -101,6 +117,32 @@ environment, build commands, sandbox config) but only 
dependency **names**
   changes, correctly **invalidating** stale speculative actions
 
 
+Overlay Fallback Resolution
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+When the same file digest appears in both a dependency's source tree
+and its artifact (e.g. a header file), both SOURCE and ARTIFACT
+overlays are generated. At instantiation time, they are tried in
+priority order: SOURCE first, then ARTIFACT, then ACTION.
+
+This enables parallelism: if a dependency is rebuilding, its SOURCE
+overlay can resolve as soon as the dependency's sources are fetched
+(before its full build completes), while the ARTIFACT overlay serves
+as a fallback if the sources are not available (dependency not
+rebuilding this invocation — its artifact is already cached).
+
+Overlay data availability at priming time:
+
+- If a referenced element is **not rebuilding**: its sources/artifacts
+  haven't changed, so the overlay's target digest remains valid and
+  ARTIFACT resolution succeeds from the cached artifact.
+- If a referenced element **is rebuilding**: its old artifact is
+  invalidated (new strong key), so ARTIFACT resolution returns None.
+  SOURCE resolution may succeed if the Fetch queue has already run.
+  If neither resolves, the subaction is deferred until the dependency
+  completes.
+
+
 Action Instantiation
 --------------------
 
@@ -108,12 +150,17 @@ The ``SpeculativeActionInstantiator`` adapts stored 
actions for the
 current dependency versions:
 
 1. Fetches the base action from CAS
-2. Resolves each overlay:
+2. Resolves each overlay with fallback (first resolved wins per target
+   digest):
 
    - **SOURCE** overlays: finds the current file digest in the element's
      source tree by path
    - **ARTIFACT** overlays: finds the current file digest in the
      dependency's artifact tree by path
+   - **ACTION** overlays: finds the current output file digest from the
+     producing subaction's ``ActionResult`` by path — looked up in
+     ``action_outputs`` (intra-element) or via the action cache
+     (cross-element)
 
 3. Builds a digest replacement map (old hash → new digest)
 4. Recursively traverses the action's input tree, replacing file digests
@@ -130,18 +177,36 @@ The scheduler queue order with speculative actions 
enabled::
 
 **Pull Queue**: For elements not cached by strong key, also pulls the
 weak key artifact proto from remotes. This is a lightweight pull — just
-the metadata, not the full artifact files. The SA proto and base action
-CAS objects are fetched on-demand by casd.
+the metadata, not the full artifact files.
 
 **Priming Queue** (``SpeculativeCachePrimingQueue``): Runs before the
-build queue. For each uncached element with stored SA:
-
-1. Pre-fetches base action protos (``FetchMissingBlobs``) and their
-   input trees (``FetchTree``) from CAS
-2. Instantiates each action with current dependency digests
-3. Submits ``Execute`` to buildbox-casd, which runs the action through
-   its local execution scheduler or forwards to remote execution
-4. The resulting ``ActionResult`` is cached in the action cache
+build queue. Uses the PENDING state to hold elements while their
+dependencies build, running background priming concurrently.
+
+Elements without stored SpeculativeActions skip this queue entirely.
+Elements that are already buildable (all deps cached) get a single
+priming pass as a job. Elements with unbuilt dependencies enter as
+PENDING:
+
+1. ``register_pending_element``: sets a per-dep callback
+   (``_set_build_dep_cached_callback``) and launches background
+   priming in the scheduler's thread pool
+2. **Background priming**: pre-fetches CAS blobs, instantiates
+   subactions whose overlays are resolvable from already-cached deps,
+   submits them fire-and-forget (reads first stream response to
+   confirm acceptance, then drops the stream)
+3. **Per-dep callback**: as each dependency becomes cached, the
+   callback triggers incremental priming — newly resolvable ARTIFACT
+   and ACTION overlays are resolved and submitted
+4. **Final pass** (element becomes buildable): all dependencies are
+   built, all ``ActionResults`` are in the action cache. Remaining
+   ACTION overlays are resolved using adapted digests from earlier
+   submissions. Remaining subactions are submitted fire-and-forget.
+5. Element proceeds to BuildQueue with all actions primed
+
+Unchanged actions (instantiated digest equals base digest) skip
+submission — they are already in the action cache from the previous
+build.
 
 **Build Queue**: Builds elements as usual. When recc runs a compile or
 link command, it checks the action cache first. If priming succeeded,
@@ -155,27 +220,15 @@ stores them for future priming.
 Scaling Considerations
 ----------------------
 
-Priming blocks the build pipeline
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The priming queue runs before the build queue. Elements cannot start
-building until they pass through priming. If priming takes longer than
-the build itself (e.g., because Execute calls are slow), it adds latency.
-
-**Mitigation**: Make priming fire-and-forget — submit Execute without
-waiting for completion. The build queue proceeds immediately. If the
-Execute completes before recc needs the action, it's a cache hit.
-
 Execute calls are full builds
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Each adapted action runs a full build command (e.g., ``gcc -c``) through
+Each adapted action runs a full build command (e.g. ``gcc -c``) through
 buildbox-run. For N elements with M subactions each, that's N×M Execute
 calls competing for CPU with the actual build queue.
 
 **Mitigation**: With remote execution, priming fans out across a cluster.
-Locally, casd's ``--jobs`` flag limits concurrent executions. Prioritize
-elements near the build frontier.
+Locally, casd's ``--jobs`` flag limits concurrent executions.
 
 FetchTree calls are sequential
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -187,14 +240,6 @@ element with many subactions, this is many sequential 
calls.
 also collect all directory digests and issue a single
 ``FetchMissingBlobs``.
 
-Race between priming and building
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-The current design prevents races by running priming before building.
-But this means priming adds to the critical path. A concurrent design
-would allow priming and building to overlap, accepting that some priming
-work may be redundant.
-
 CAS storage growth
 ~~~~~~~~~~~~~~~~~~
 
@@ -214,17 +259,11 @@ changed — in which case the SA is correctly invalidated.
 Future Optimizations
 --------------------
 
-1. **Fire-and-forget Execute**: Submit adapted actions without waiting.
-   The build queue proceeds immediately; cache hits happen opportunistically.
-
-2. **Concurrent priming**: Run priming in parallel with the build queue.
-   Elements enter both queues simultaneously.
-
-3. **Topological prioritization**: Prime elements in build order (leaves
+1. **Topological prioritization**: Prime elements in build order (leaves
    first) to maximize the chance priming completes before building starts.
 
-4. **Selective priming**: Skip cheap actions (fast link steps), prioritize
+2. **Selective priming**: Skip cheap actions (fast link steps), prioritize
    expensive ones (long compilations).
 
-5. **Batch FetchTree**: Collect all input root digests and fetch in
+3. **Batch FetchTree**: Collect all input root digests and fetch in
    parallel or in a single batch.
diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py 
b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
index 1819df4cb..107b3e2a9 100644
--- a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
+++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py
@@ -19,14 +19,18 @@ SpeculativeCachePrimingQueue
 
 Queue for priming the ActionCache with speculative actions.
 
-This queue runs BEFORE BuildQueue to aggressively front-run builds:
-1. For each element that needs building, check if SpeculativeActions
-   from a previous build are stored under the element's weak key
-2. Ensure all needed CAS blobs are local (single FetchMissingBlobs call)
-3. Instantiate actions by applying overlays with current dependency digests
-4. Submit to execution via buildbox-casd to produce verified ActionResults
-5. The results are cached so when recc (or the build) later needs the
-   same action, it gets an ActionCache hit instead of rebuilding
+This queue runs BEFORE BuildQueue and uses the PENDING state to hold
+elements while their dependencies build.  While an element waits,
+background priming runs fire-and-forget — submitting adapted actions
+to casd for execution.  As each dependency completes, per-dep callbacks
+trigger incremental overlay resolution, unlocking more subactions.
+
+When all dependencies are cached and the element becomes buildable,
+a final priming pass resolves remaining ACTION overlays and the
+element is released to the BuildQueue.  By then, most adapted actions
+are already in the action cache — recc gets cache hits.
+
+Elements without stored SpeculativeActions skip this queue entirely.
 """
 
 # Local imports
@@ -42,11 +46,10 @@ class SpeculativeCachePrimingQueue(Queue):
     resources = [ResourceType.UPLOAD]
 
     def get_process_func(self):
-        return SpeculativeCachePrimingQueue._prime_cache
+        # Runs when element is READY (buildable) — final priming pass
+        return SpeculativeCachePrimingQueue._final_prime_pass
 
     def status(self, element):
-        # Prime elements that are NOT cached (will need building) and
-        # have stored SpeculativeActions from a previous build.
         if element._cached():
             return QueueStatus.SKIP
 
@@ -60,102 +63,301 @@ class SpeculativeCachePrimingQueue(Queue):
         if not spec_actions or not spec_actions.actions:
             return QueueStatus.SKIP
 
+        # Has SAs.  If not buildable, enter PENDING — background
+        # priming will run while we wait for dependencies.
+        if not element._buildable():
+            return QueueStatus.PENDING
+
+        # Already buildable — run final priming pass as a job
         return QueueStatus.READY
 
+    def register_pending_element(self, element):
+        # Register per-dep callback for incremental overlay resolution
+        element._set_build_dep_cached_callback(self._on_dep_cached)
+
+        # Also register buildable callback so we get re-enqueued
+        # when the element becomes fully buildable
+        element._set_buildable_callback(self._enqueue_element)
+
+        # Launch background priming immediately in the scheduler's
+        # thread pool — fire-and-forget independent subactions while
+        # we wait for dependencies
+        self._scheduler.loop.call_soon(self._launch_background_priming, 
element)
+
+    def _launch_background_priming(self, element):
+        self._scheduler.loop.run_in_executor(
+            None, SpeculativeCachePrimingQueue._background_prime, element
+        )
+
+    def _on_dep_cached(self, element, dep):
+        """Called each time a build dependency of element becomes cached.
+
+        Launches incremental priming in the background — newly resolvable
+        ARTIFACT overlays (dep's artifact now cached) and ACTION overlays
+        (dep's subaction results now in AC) can be resolved and submitted.
+        """
+        self._scheduler.loop.call_soon(
+            self._launch_incremental_prime, element, dep
+        )
+
+    def _launch_incremental_prime(self, element, dep):
+        self._scheduler.loop.run_in_executor(
+            None, SpeculativeCachePrimingQueue._incremental_prime, element, dep
+        )
+
     def done(self, _, element, result, status):
         if status is JobStatus.FAIL:
             return
 
         if result:
-            primed_count, total_count = result
-            element.info(f"Primed {primed_count}/{total_count} actions")
+            primed, skipped, total = result
+            if skipped:
+                element.info(f"Primed {primed}/{total} actions ({skipped} 
skipped)")
+            else:
+                element.info(f"Primed {primed}/{total} actions")
+
+        # Clear priming state and per-dep callback
+        element._set_build_dep_cached_callback(None)
+        element.__priming_submitted = None
+        element.__priming_action_outputs = None
+        element.__priming_adapted_digests = None
+
+    # -----------------------------------------------------------------
+    # Background priming (runs in thread pool while element is PENDING)
+    # -----------------------------------------------------------------
 
     @staticmethod
-    def _prime_cache(element):
+    def _background_prime(element):
+        """Initial background priming pass.
+
+        Fire-and-forget subactions whose overlays can be resolved from
+        already-cached deps.  Defer everything else.
+        """
+        SpeculativeCachePrimingQueue._do_prime_pass(element)
+
+    @staticmethod
+    def _incremental_prime(element, dep):
+        """Incremental priming after a dependency becomes cached.
+
+        Re-attempt overlay resolution — the newly cached dep may unlock
+        ARTIFACT overlays or ACTION overlays.
+        """
+        SpeculativeCachePrimingQueue._do_prime_pass(element)
+
+    @staticmethod
+    def _do_prime_pass(element):
+        """Core priming logic shared by background and incremental passes.
+
+        Iterates over all subactions, skipping already-submitted ones.
+        For each remaining subaction, attempts to resolve all overlays.
+        If resolvable, instantiates and submits fire-and-forget.
+        """
         from ..._speculative_actions.instantiator import 
SpeculativeActionInstantiator
+        from ..._protos.buildstream.v2 import speculative_actions_pb2
 
         context = element._get_context()
         cas = context.get_cascache()
         artifactcache = context.artifactcache
 
-        # Get SpeculativeActions by weak key
         weak_key = element._get_weak_cache_key()
         spec_actions = 
artifactcache.lookup_speculative_actions_by_weak_key(element, weak_key)
         if not spec_actions or not spec_actions.actions:
-            return None
+            return
 
-        # Pre-fetch all CAS blobs needed for instantiation so the
-        # instantiator runs entirely from local CAS without round-trips.
-        #
-        # Phase 1: Fetch all base Action protos in one FetchMissingBlobs batch
-        # Phase 2: For each action, fetch its entire input tree via FetchTree
-        project = element._get_project()
-        _, storage_remotes = artifactcache.get_remotes(project.name, False)
-        remote = storage_remotes[0] if storage_remotes else None
+        # Recover or initialize state
+        submitted = getattr(element, 
"_SpeculativeCachePrimingQueue__priming_submitted", None) or set()
+        action_outputs = getattr(element, 
"_SpeculativeCachePrimingQueue__priming_action_outputs", None) or {}
+        adapted_digests = getattr(element, 
"_SpeculativeCachePrimingQueue__priming_adapted_digests", None) or {}
 
-        if remote:
-            from ..._protos.build.bazel.remote.execution.v2 import 
remote_execution_pb2
+        # Pre-fetch CAS blobs only on first pass
+        if not submitted:
+            SpeculativeCachePrimingQueue._prefetch_cas_blobs(
+                element, spec_actions, cas, artifactcache
+            )
 
-            # Phase 1: batch-fetch all base Action protos
-            base_action_digests = [
-                sa.base_action_digest
-                for sa in spec_actions.actions
-                if sa.base_action_digest.hash
-            ]
-            if base_action_digests:
-                try:
-                    cas.fetch_blobs(remote, base_action_digests, 
allow_partial=True)
-                except Exception:
-                    pass  # Best-effort
-
-            # Phase 2: fetch input trees for each base action
-            for digest in base_action_digests:
-                try:
-                    action = cas.fetch_action(digest)
-                    if action and action.HasField("input_root_digest"):
-                        cas.fetch_directory(remote, action.input_root_digest)
-                except Exception:
-                    pass  # Best-effort; instantiator skips actions it can't 
resolve
-
-        # Build element lookup for dependency resolution
+        # Build element lookup
         from ...types import _Scope
 
         dependencies = list(element._dependencies(_Scope.BUILD, recurse=True))
         element_lookup = {dep.name: dep for dep in dependencies}
         element_lookup[element.name] = element
 
-        # Get execution service
+        # Get services
         casd = context.get_casd()
         exec_service = casd.get_exec_service()
         if not exec_service:
-            element.warn("No execution service available for speculative 
action priming")
-            return None
+            return
 
-        # Instantiate and submit each action
-        instantiator = SpeculativeActionInstantiator(cas, artifactcache)
-        primed_count = 0
-        total_count = len(spec_actions.actions)
+        ac_service = casd.get_ac_service()
+        instantiator = SpeculativeActionInstantiator(cas, artifactcache, 
ac_service=ac_service)
 
         for spec_action in spec_actions.actions:
+            base_hash = spec_action.base_action_digest.hash
+
+            if base_hash in submitted:
+                continue
+
+            # Check overlay resolvability
+            resolvable = True
+            for overlay in spec_action.overlays:
+                if overlay.type == 
speculative_actions_pb2.SpeculativeActions.Overlay.ACTION:
+                    key = (overlay.source_action_digest.hash, 
overlay.source_path)
+                    if key not in action_outputs and ac_service:
+                        # The AC stores results under the adapted digest
+                        # (what was actually executed), but overlays reference
+                        # the base digest.  Look up with adapted, store under 
base.
+                        base_key_hash = overlay.source_action_digest.hash
+                        lookup_digest = adapted_digests.get(
+                            base_key_hash,
+                            overlay.source_action_digest,
+                        )
+                        
SpeculativeCachePrimingQueue._fetch_action_outputs_keyed(
+                            ac_service, lookup_digest, base_key_hash,
+                            action_outputs,
+                        )
+                    if key not in action_outputs:
+                        resolvable = False
+                        break
+
+            if not resolvable:
+                continue
+
             try:
-                action_digest = instantiator.instantiate_action(spec_action, 
element, element_lookup)
+                action_digest = instantiator.instantiate_action(
+                    spec_action, element, element_lookup,
+                    action_outputs=action_outputs,
+                )
 
                 if not action_digest:
                     continue
 
-                if SpeculativeCachePrimingQueue._submit_action(
+                # Skip unchanged actions (already in AC from previous build)
+                if action_digest.hash == base_hash:
+                    submitted.add(base_hash)
+                    continue
+
+                SpeculativeCachePrimingQueue._submit_action_async(
                     exec_service, action_digest, element
-                ):
-                    primed_count += 1
+                )
+                element.info(
+                    f"Submitted action {action_digest.hash[:8]} "
+                    f"(base {base_hash[:8]})"
+                )
+                submitted.add(base_hash)
+                adapted_digests[base_hash] = action_digest
 
             except Exception as e:
                 element.warn(f"Failed to prime action: {e}")
                 continue
 
-        return (primed_count, total_count)
+        # Store state for next pass
+        element.__priming_submitted = submitted
+        element.__priming_action_outputs = action_outputs
+        element.__priming_adapted_digests = adapted_digests
+
+    # -----------------------------------------------------------------
+    # Final priming pass (runs as a job when element becomes READY)
+    # -----------------------------------------------------------------
+
+    @staticmethod
+    def _final_prime_pass(element):
+        """Final priming pass when element is buildable.
+
+        All deps are built, so all ActionResults are in AC.
+        Resolve any remaining ACTION overlays and submit.
+        """
+        # Run the same logic — it will pick up where background left off
+        SpeculativeCachePrimingQueue._do_prime_pass(element)
+
+        # Count results
+        submitted = getattr(element, 
"_SpeculativeCachePrimingQueue__priming_submitted", None) or set()
+
+        from ..._protos.buildstream.v2 import speculative_actions_pb2
+
+        context = element._get_context()
+        artifactcache = context.artifactcache
+        weak_key = element._get_weak_cache_key()
+        spec_actions = 
artifactcache.lookup_speculative_actions_by_weak_key(element, weak_key)
+        if not spec_actions:
+            return (0, 0, 0)
+
+        total = len(spec_actions.actions)
+        primed = len(submitted)
+        skipped = total - primed
+
+        return (primed, skipped, total)
+
+    # -----------------------------------------------------------------
+    # Utility methods
+    # -----------------------------------------------------------------
+
+    @staticmethod
+    def _prefetch_cas_blobs(element, spec_actions, cas, artifactcache):
+        """Pre-fetch all CAS blobs needed for instantiation."""
+        project = element._get_project()
+        _, storage_remotes = artifactcache.get_remotes(project.name, False)
+        remote = storage_remotes[0] if storage_remotes else None
+
+        if not remote:
+            return
+
+        base_action_digests = [
+            sa.base_action_digest
+            for sa in spec_actions.actions
+            if sa.base_action_digest.hash
+        ]
+        if base_action_digests:
+            try:
+                cas.fetch_blobs(remote, base_action_digests, 
allow_partial=True)
+            except Exception:
+                pass
+
+        for digest in base_action_digests:
+            try:
+                action = cas.fetch_action(digest)
+                if action and action.HasField("input_root_digest"):
+                    cas.fetch_directory(remote, action.input_root_digest)
+            except Exception:
+                pass
+
+    @staticmethod
+    def _fetch_action_outputs(ac_service, action_digest, action_outputs):
+        """Fetch ActionResult from action cache and record output file 
digests."""
+        SpeculativeCachePrimingQueue._fetch_action_outputs_keyed(
+            ac_service, action_digest, action_digest.hash, action_outputs
+        )
 
     @staticmethod
-    def _submit_action(exec_service, action_digest, element):
+    def _fetch_action_outputs_keyed(ac_service, action_digest, key_hash, 
action_outputs):
+        """Fetch ActionResult and store outputs keyed by a specified hash.
+
+        When resolving ACTION overlays, the overlay references the base
+        action digest but the AC stores the result under the adapted
+        digest.  This method allows looking up with one digest but
+        storing results under a different key hash.
+        """
+        try:
+            from ..._protos.build.bazel.remote.execution.v2 import 
remote_execution_pb2
+
+            request = remote_execution_pb2.GetActionResultRequest(
+                action_digest=action_digest,
+            )
+            action_result = ac_service.GetActionResult(request)
+            if action_result:
+                for output_file in action_result.output_files:
+                    action_outputs[(key_hash, output_file.path)] = 
output_file.digest
+        except Exception:
+            pass
+
+    @staticmethod
+    def _submit_action_async(exec_service, action_digest, element):
+        """Submit an Execute request fire-and-forget style.
+
+        Reads the first response from the stream to confirm the action
+        was accepted by casd, then returns.  The action continues
+        executing asynchronously in casd and its result will appear in
+        the action cache when complete.
+        """
         try:
             from ..._protos.build.bazel.remote.execution.v2 import 
remote_execution_pb2
 
@@ -164,18 +366,9 @@ class SpeculativeCachePrimingQueue(Queue):
                 skip_cache_lookup=False,
             )
 
-            operation_stream = exec_service.Execute(request)
-            for operation in operation_stream:
-                if operation.done:
-                    if operation.HasField("error"):
-                        element.warn(
-                            f"Priming action failed: {operation.error.message}"
-                        )
-                        return False
-                    return True
-
-            return False
+            # Read first response to confirm acceptance, then drop the stream
+            stream = exec_service.Execute(request)
+            next(stream, None)
 
         except Exception as e:
             element.warn(f"Failed to submit priming action: {e}")
-            return False
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index f493b6a30..b9169076e 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -300,6 +300,7 @@ class Element(Plugin):
         self.__required_callback = None  # Callback to Queues
         self.__can_query_cache_callback = None  # Callback to 
PullQueue/FetchQueue
         self.__buildable_callback = None  # Callback to BuildQueue
+        self.__build_dep_cached_callback = None  # Callback to PrimingQueue 
(per-dep)
 
         self.__resolved_initial_state = False  # Whether the initial state of 
the Element has been resolved
 
@@ -2489,6 +2490,23 @@ class Element(Plugin):
     def _set_buildable_callback(self, callback):
         self.__buildable_callback = callback
 
+    # _set_build_dep_cached_callback()
+    #
+    # Set a callback invoked each time a build dependency becomes cached.
+    # Unlike _set_buildable_callback (which fires only when ALL deps are
+    # cached), this fires incrementally — once per completed dep.
+    #
+    # Used by SpeculativeCachePrimingQueue for incremental overlay
+    # resolution: each completed dep may unlock ARTIFACT overlays or
+    # ACTION overlays whose producing subactions just finished.
+    #
+    # Args:
+    #    callback (callable) - Called with (element, dep) where dep is
+    #        the just-cached dependency
+    #
+    def _set_build_dep_cached_callback(self, callback):
+        self.__build_dep_cached_callback = callback
+
     # _set_depth()
     #
     # Set the depth of the Element.
@@ -2534,6 +2552,11 @@ class Element(Plugin):
                     rdep.__build_deps_uncached -= 1
                     assert not rdep.__build_deps_uncached < 0
 
+                    # Notify priming queue of each completed dep for
+                    # incremental overlay resolution
+                    if rdep.__build_dep_cached_callback is not None:
+                        rdep.__build_dep_cached_callback(rdep, self)
+
                     if rdep._buildable():
                         rdep.__update_cache_key_non_strict()
 
diff --git a/tests/integration/project/elements/speculative/app-chained.bst 
b/tests/integration/project/elements/speculative/app-chained.bst
new file mode 100644
index 000000000..63d197eaf
--- /dev/null
+++ b/tests/integration/project/elements/speculative/app-chained.bst
@@ -0,0 +1,35 @@
+kind: autotools
+description: |
+  Multi-file application for testing ACTION overlay chaining.
+
+  Same as app.bst but also depends on slow-dep.bst. The slow
+  dependency keeps this element not-buildable for long enough that
+  fire-and-forget compile actions complete and their results appear
+  in the action cache. This allows subsequent priming passes to
+  resolve ACTION overlays on the link step.
+
+build-depends:
+- filename: base/base-debian.bst
+  config:
+    digest-environment: RECC_REMOTE_PLATFORM_chrootRootDigest
+- recc/recc.bst
+- speculative/dep.bst
+- speculative/slow-dep.bst
+
+sources:
+- kind: tar
+  url: project_dir:/files/speculative/multifile.tar.gz
+  ref: 1242f38c2b92574bf851fcf51c83a50087debb953aa302763b4e72339a345ab5
+
+sandbox:
+  remote-apis-socket:
+    path: /tmp/casd.sock
+
+environment:
+  CC: recc gcc
+  RECC_LOG_LEVEL: debug
+  RECC_LOG_DIRECTORY: .recc-log
+  RECC_DEPS_GLOBAL_PATHS: 1
+  RECC_NO_PATH_REWRITE: 1
+  RECC_LINK: 1
+  RECC_SERVER: unix:/tmp/casd.sock
diff --git a/tests/integration/project/elements/speculative/slow-dep.bst 
b/tests/integration/project/elements/speculative/slow-dep.bst
new file mode 100644
index 000000000..33b01f8c0
--- /dev/null
+++ b/tests/integration/project/elements/speculative/slow-dep.bst
@@ -0,0 +1,19 @@
+kind: manual
+description: |
+  Slow dependency that takes time to build.
+  Used to keep downstream elements not-buildable while fire-and-forget
+  priming actions complete, enabling ACTION overlay resolution.
+
+build-depends:
+- filename: base/base-debian.bst
+
+sources:
+- kind: local
+  path: files/speculative/slow-dep-files
+
+config:
+  install-commands:
+  - |
+    sleep 2
+    mkdir -p %{install-root}/usr/lib/speculative
+    cp slow.txt %{install-root}/usr/lib/speculative/slow.txt
diff --git 
a/tests/integration/project/files/speculative/slow-dep-files/slow.txt 
b/tests/integration/project/files/speculative/slow-dep-files/slow.txt
new file mode 100644
index 000000000..086e0352c
--- /dev/null
+++ b/tests/integration/project/files/speculative/slow-dep-files/slow.txt
@@ -0,0 +1 @@
+slow dependency v1
diff --git a/tests/integration/speculative_actions.py 
b/tests/integration/speculative_actions.py
index 5a8f23d82..c22432c9a 100644
--- a/tests/integration/speculative_actions.py
+++ b/tests/integration/speculative_actions.py
@@ -340,23 +340,181 @@ def test_speculative_actions_priming(cli, datafiles):
         f"(first build had {first_remote_execs} remote executions)"
     )
 
-    # The priming should have resulted in at least some cache hits.
-    # Ideally: util.c compile is a direct hit (unchanged), main.c compile
-    # and link are primed hits. But even partial success is valuable.
-    assert cache_hits > 0, (
-        f"Expected cache hits from priming, got 0. "
-        f"Remote executions: {remote_execs}. "
-        f"The adapted action digests may not match recc's computed actions."
-    )
-
-    # The total should account for all actions: some cache hits
-    # (from priming or unchanged), fewer remote executions than
-    # the first build.
+    # With fire-and-forget, priming may still be in-flight when recc
+    # submits the same action.  The RE system deduplicates — recc waits
+    # for the already-running execution rather than starting a new one.
+    # This shows up as "Executing action remotely" in the recc log, not
+    # a cache hit, but is still correct behavior.
+    #
+    # Verify that primed action digests match recc's by comparing the
+    # digests logged during priming with those in the recc buildbox log.
+    primed_digests = set(
+        re.findall(r"Submitted action ([0-9a-f]+)", rebuild_output)
+    )
+    recc_digests = set(
+        re.findall(r"Action Digest: ([0-9a-f]+)/", rebuild_recc_log)
+    )
+    # Truncate both to 8 chars for comparison
+    primed_short = {d[:8] for d in primed_digests}
+    recc_short = {d[:8] for d in recc_digests}
+
+    matching = primed_short & recc_short
+    print(
+        f"Digest match: {len(matching)} of {len(primed_short)} primed "
+        f"actions found in recc's {len(recc_short)} actions"
+    )
+
+    # At least some primed digests should match recc's.
+    # Unmatched primed actions indicate overlays that didn't resolve
+    # correctly (e.g. ACTION overlays whose producers hadn't completed).
+    assert len(matching) > 0, (
+        f"No primed action digests match recc's actions. "
+        f"Primed: {primed_short}, Recc: {recc_short}"
+    )
+
+    # The total should account for all actions
     assert cache_hits + remote_execs >= first_remote_execs, (
         f"Expected at least {first_remote_execs} total actions "
         f"(hits + execs), got {cache_hits + remote_execs}"
     )
-    assert remote_execs < first_remote_execs, (
-        f"Expected fewer remote executions than first build "
-        f"({first_remote_execs}), got {remote_execs}"
+
+
[email protected](DATA_DIR)
[email protected](not HAVE_SANDBOX, reason="Only available with a 
functioning sandbox")
+def test_speculative_actions_action_overlay_chaining(cli, datafiles):
+    """
+    End-to-end test for ACTION overlay chaining with a slow dependency.
+
+    app-chained.bst depends on dep.bst (fast, header change) and
+    slow-dep.bst (5s sleep).  The slow dependency keeps app-chained
+    not-buildable while the priming queue re-enqueues:
+
+    1. First pass: compile actions submitted fire-and-forget, link
+       deferred (ACTION overlay unresolvable — compile not in AC yet)
+    2. Re-enqueue passes: slow-dep still building, compile completes
+       in AC, ACTION overlay resolves, link submitted fire-and-forget
+    3. slow-dep completes, app-chained becomes buildable, released
+       to build queue with all actions primed
+
+    This demonstrates that the iterative priming + re-enqueue mechanism
+    correctly chains ACTION overlays across subactions.
+    """
+    project = str(datafiles)
+    app_element = "speculative/app-chained.bst"
+
+    cli.configure({"scheduler": {"speculative-actions": True}})
+
+    # --- First build: generate speculative actions ---
+    result = cli.run(
+        project=project,
+        args=["--cache-buildtrees", "always", "build", app_element],
+    )
+    if result.exit_code != 0:
+        cli.run(
+            project=project,
+            args=[
+                "shell", "--build", "--use-buildtree", app_element,
+                "--", "sh", "-c",
+                "cat config.log .recc-log/* */.recc-log/* 2>/dev/null",
+            ],
+        )
+    assert result.exit_code == 0
+    first_build_output = result.stderr
+
+    gen_processed = _parse_queue_processed(first_build_output, "Generating 
overlays")
+    assert gen_processed is not None and gen_processed > 0, (
+        "First build did not generate speculative actions"
+    )
+
+    # Count first build remote executions
+    result = cli.run(
+        project=project,
+        args=[
+            "shell", "--build", "--use-buildtree", app_element,
+            "--", "sh", "-c", "cat src/.recc-log/recc.buildbox*",
+        ],
+    )
+    assert result.exit_code == 0
+    first_remote_execs = result.output.count("Executing action remotely")
+    assert first_remote_execs >= 3, (
+        f"Expected at least 3 remote executions, got {first_remote_execs}"
+    )
+
+    # --- Modify dep: change dep.h header ---
+    dep_header = os.path.join(
+        project, "files", "speculative", "dep-files",
+        "usr", "include", "speculative", "dep.h",
+    )
+    with open(dep_header, "w") as f:
+        f.write("#ifndef DEP_H\n#define DEP_H\n#define DEP_VERSION 
2\n#endif\n")
+
+    # Also modify slow-dep so it needs rebuilding on the second build.
+    # This keeps app-chained not-buildable while slow-dep rebuilds,
+    # giving background priming time to resolve ACTION overlays.
+    slow_dep_file = os.path.join(
+        project, "files", "speculative", "slow-dep-files", "slow.txt",
+    )
+    with open(slow_dep_file, "w") as f:
+        f.write("slow dependency v2\n")
+
+    # --- Second build: priming with slow dependency ---
+    result = cli.run(
+        project=project,
+        args=["--cache-buildtrees", "always", "build", app_element],
+    )
+    assert result.exit_code == 0
+    rebuild_output = result.stderr
+
+    # Verify priming queue processed app-chained
+    primed = _parse_queue_processed(rebuild_output, "Priming cache")
+    assert primed is not None and primed > 0, (
+        "Priming queue did not process app-chained"
+    )
+
+    # Extract primed action digests
+    primed_digests = set(
+        re.findall(r"Submitted action ([0-9a-f]+)", rebuild_output)
+    )
+
+    # Check rebuild recc log
+    result = cli.run(
+        project=project,
+        args=[
+            "shell", "--build", "--use-buildtree", app_element,
+            "--", "sh", "-c", "cat src/.recc-log/recc.buildbox*",
+        ],
+    )
+    assert result.exit_code == 0
+    rebuild_recc_log = result.output
+    cache_hits = rebuild_recc_log.count("Action Cache hit")
+    remote_execs = rebuild_recc_log.count("Executing action remotely")
+
+    # Extract recc action digests
+    recc_digests = set(
+        re.findall(r"Action Digest: ([0-9a-f]+)/", rebuild_recc_log)
+    )
+    primed_short = {d[:8] for d in primed_digests}
+    recc_short = {d[:8] for d in recc_digests}
+    matching = primed_short & recc_short
+
+    print(
+        f"Chained priming result: {cache_hits} cache hits, "
+        f"{remote_execs} remote executions "
+        f"(first build had {first_remote_execs} remote executions)"
+    )
+    print(
+        f"Digest match: {len(matching)} of {len(primed_short)} primed "
+        f"actions found in recc's {len(recc_short)} actions"
+    )
+    print(f"  Primed: {sorted(primed_short)}")
+    print(f"  Recc:   {sorted(recc_short)}")
+
+    # With slow-dep rebuilding (10s), app-chained enters priming as
+    # PENDING.  Background priming submits the compile fire-and-forget.
+    # Per-dep callback on dep completion may resolve more overlays.
+    # Final pass when buildable resolves remaining ACTION overlays.
+    # We expect at least the compile action digest to match recc's.
+    assert len(matching) >= 1, (
+        f"Expected at least 1 primed action to match recc's, "
+        f"got {len(matching)}. Primed: {primed_short}, Recc: {recc_short}"
     )


Reply via email to