This is an automated email from the ASF dual-hosted git repository. striker pushed a commit to branch striker/speculative-actions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 46d56f9ec4ebba9f8ea772f19bc88cc203b3aaed Author: Sander Striker <[email protected]> AuthorDate: Sat Mar 21 22:33:08 2026 +0100 speculative-actions: Deduplicate and parallelize FetchTree in prefetch Optimize _prefetch_cas_blobs to reduce remote FetchTree latency: 1. Deduplicate input root digests — many subactions share the same input trees, so redundant FetchTree calls are eliminated. 2. Issue FetchTree calls concurrently via ThreadPoolExecutor (up to 16 workers) instead of sequentially. Individual FetchTree calls are preserved (no synthetic root) to maintain remote cache hit rates — input roots from actual builds are likely already cached on the remote. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- .../queues/speculativecacheprimingqueue.py | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py index d99eba663..9550402e3 100644 --- a/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py +++ b/src/buildstream/_scheduler/queues/speculativecacheprimingqueue.py @@ -412,7 +412,13 @@ class SpeculativeCachePrimingQueue(Queue): @staticmethod def _prefetch_cas_blobs(element, spec_actions, cas, artifactcache): - """Pre-fetch all CAS blobs needed for instantiation.""" + """Pre-fetch all CAS blobs needed for instantiation. + + Fetches base action blobs in a single batch, then deduplicates + input root digests and fetches directory trees concurrently. + """ + from concurrent.futures import ThreadPoolExecutor, as_completed + project = element._get_project() _, storage_remotes = artifactcache.get_remotes(project.name, False) remote = storage_remotes[0] if storage_remotes else None @@ -431,14 +437,33 @@ class SpeculativeCachePrimingQueue(Queue): except Exception: pass + # Collect and deduplicate input root digests + unique_roots = {} # hash -> digest for digest in base_action_digests: try: action = cas.fetch_action(digest) if action and action.HasField("input_root_digest"): - cas.fetch_directory(remote, action.input_root_digest) + root = action.input_root_digest + if root.hash not in unique_roots: + unique_roots[root.hash] = root + except Exception: + pass + + if not unique_roots: + return + + # Fetch directory trees concurrently + def _fetch_tree(root_digest): + try: + cas.fetch_directory(remote, root_digest) except Exception: pass + with ThreadPoolExecutor(max_workers=min(16, len(unique_roots))) as pool: + futures = [pool.submit(_fetch_tree, d) for d in unique_roots.values()] + for f in as_completed(futures): + pass # Errors handled inside _fetch_tree + @staticmethod def _submit_action_async(exec_service, action_digest, element): """Submit an Execute request fire-and-forget style.
