This is an automated email from the ASF dual-hosted git repository. striker pushed a commit to branch striker/speculative-actions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 50ba3253a653c9f18f72aebc609bcfe36bcc005e Author: Sander Striker <[email protected]> AuthorDate: Mon Mar 16 18:28:12 2026 +0100 speculative actions: Add proto definitions, generator, and instantiator Proto definitions: - speculative_actions.proto: SpeculativeActions, SpeculativeAction, Overlay messages for storing build action overlays - ActionResult.subactions (field 99): repeated Digest for nested execution action digests recorded by recc/trexe - Artifact.speculative_actions: Digest field for SA proto storage Generator (generator.py): - Analyzes completed builds to extract subaction digests from Actions - Builds digest cache mapping file hashes to source elements (SOURCE priority > ARTIFACT priority) - Creates overlays linking each input file to its origin element/path - Generates artifact_overlays for downstream dependency tracing Instantiator (instantiator.py): - Fetches base actions from CAS and resolves SOURCE/ARTIFACT overlays - Replaces file digests in action input trees recursively - Stores adapted actions back to CAS - Optimization: skips overlays for unchanged dependencies Config: - scheduler.speculative-actions flag (default false) in userconfig.yaml Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- .../remote/execution/v2/remote_execution_pb2.py | 188 +++++----- .../remote/execution/v2/remote_execution_pb2.pyi | 16 +- .../_protos/buildstream/v2/artifact.proto | 3 + .../_protos/buildstream/v2/artifact_pb2.py | 16 +- .../_protos/buildstream/v2/artifact_pb2.pyi | 6 +- .../buildstream/v2/speculative_actions.proto | 62 ++++ .../buildstream/v2/speculative_actions_pb2.py | 43 +++ .../buildstream/v2/speculative_actions_pb2.pyi | 40 +++ .../buildstream/v2/speculative_actions_pb2_grpc.py | 24 ++ src/buildstream/_speculative_actions/__init__.py | 30 ++ src/buildstream/_speculative_actions/generator.py | 392 ++++++++++++++++++++ .../_speculative_actions/instantiator.py | 393 +++++++++++++++++++++ src/buildstream/data/userconfig.yaml | 6 + 13 files changed, 1105 insertions(+), 114 deletions(-) diff --git a/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.py b/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.py index 569ec22aa..357b3f7fa 100644 --- a/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.py +++ b/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.py @@ -32,7 +32,7 @@ from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb from buildstream._protos.google.rpc import status_pb2 as google_dot_rpc_dot_status__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n6build/bazel/remote/execution/v2/remote_execution.proto\x12\x1f\x62uild.bazel.remote.execution.v2\x1a\x1f\x62uild/bazel/semver/semver.proto\x1a\x1cgoogle/api/annotations.proto\x1a#google/longrunning/operations.proto\x1a\x19google/protobuf/any.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x17google/rpc/status.proto\"\xa6\x02\n\x06\x41\x63tion\x12?\n\ [...] +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n6build/bazel/remote/execution/v2/remote_execution.proto\x12\x1f\x62uild.bazel.remote.execution.v2\x1a\x1f\x62uild/bazel/semver/semver.proto\x1a\x1cgoogle/api/annotations.proto\x1a#google/longrunning/operations.proto\x1a\x19google/protobuf/any.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\x1a\x17google/rpc/status.proto\"\xa6\x02\n\x06\x41\x63tion\x12?\n\ [...] _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -99,97 +99,97 @@ if not _descriptor._USE_C_DESCRIPTORS: _globals['_EXECUTEDACTIONMETADATA']._serialized_start=2282 _globals['_EXECUTEDACTIONMETADATA']._serialized_end=3015 _globals['_ACTIONRESULT']._serialized_start=3018 - _globals['_ACTIONRESULT']._serialized_end=3697 - _globals['_OUTPUTFILE']._serialized_start=3700 - _globals['_OUTPUTFILE']._serialized_end=3910 - _globals['_TREE']._serialized_start=3912 - _globals['_TREE']._serialized_end=4038 - _globals['_OUTPUTDIRECTORY']._serialized_start=4041 - _globals['_OUTPUTDIRECTORY']._serialized_end=4245 - _globals['_OUTPUTSYMLINK']._serialized_start=4247 - _globals['_OUTPUTSYMLINK']._serialized_end=4372 - _globals['_EXECUTIONPOLICY']._serialized_start=4374 - _globals['_EXECUTIONPOLICY']._serialized_end=4409 - _globals['_RESULTSCACHEPOLICY']._serialized_start=4411 - _globals['_RESULTSCACHEPOLICY']._serialized_end=4449 - _globals['_EXECUTEREQUEST']._serialized_start=4452 - _globals['_EXECUTEREQUEST']._serialized_end=4914 - _globals['_LOGFILE']._serialized_start=4916 - _globals['_LOGFILE']._serialized_end=5006 - _globals['_EXECUTERESPONSE']._serialized_start=5009 - _globals['_EXECUTERESPONSE']._serialized_end=5345 - _globals['_EXECUTERESPONSE_SERVERLOGSENTRY']._serialized_start=5254 - _globals['_EXECUTERESPONSE_SERVERLOGSENTRY']._serialized_end=5345 - _globals['_EXECUTIONSTAGE']._serialized_start=5347 - _globals['_EXECUTIONSTAGE']._serialized_end=5444 - _globals['_EXECUTIONSTAGE_VALUE']._serialized_start=5365 - _globals['_EXECUTIONSTAGE_VALUE']._serialized_end=5444 - _globals['_EXECUTEOPERATIONMETADATA']._serialized_start=5447 - _globals['_EXECUTEOPERATIONMETADATA']._serialized_end=5756 - _globals['_WAITEXECUTIONREQUEST']._serialized_start=5758 - _globals['_WAITEXECUTIONREQUEST']._serialized_end=5794 - _globals['_GETACTIONRESULTREQUEST']._serialized_start=5797 - _globals['_GETACTIONRESULTREQUEST']._serialized_end=6063 - _globals['_UPDATEACTIONRESULTREQUEST']._serialized_start=6066 - _globals['_UPDATEACTIONRESULTREQUEST']._serialized_end=6413 - _globals['_FINDMISSINGBLOBSREQUEST']._serialized_start=6416 - _globals['_FINDMISSINGBLOBSREQUEST']._serialized_end=6607 - _globals['_FINDMISSINGBLOBSRESPONSE']._serialized_start=6609 - _globals['_FINDMISSINGBLOBSRESPONSE']._serialized_end=6706 - _globals['_BATCHUPDATEBLOBSREQUEST']._serialized_start=6709 - _globals['_BATCHUPDATEBLOBSREQUEST']._serialized_end=7075 - _globals['_BATCHUPDATEBLOBSREQUEST_REQUEST']._serialized_start=6924 - _globals['_BATCHUPDATEBLOBSREQUEST_REQUEST']._serialized_end=7075 - _globals['_BATCHUPDATEBLOBSRESPONSE']._serialized_start=7078 - _globals['_BATCHUPDATEBLOBSRESPONSE']._serialized_end=7296 - _globals['_BATCHUPDATEBLOBSRESPONSE_RESPONSE']._serialized_start=7193 - _globals['_BATCHUPDATEBLOBSRESPONSE_RESPONSE']._serialized_end=7296 - _globals['_BATCHREADBLOBSREQUEST']._serialized_start=7299 - _globals['_BATCHREADBLOBSREQUEST']._serialized_end=7566 - _globals['_BATCHREADBLOBSRESPONSE']._serialized_start=7569 - _globals['_BATCHREADBLOBSRESPONSE']._serialized_end=7869 - _globals['_BATCHREADBLOBSRESPONSE_RESPONSE']._serialized_start=7681 - _globals['_BATCHREADBLOBSRESPONSE_RESPONSE']._serialized_end=7869 - _globals['_GETTREEREQUEST']._serialized_start=7872 - _globals['_GETTREEREQUEST']._serialized_end=8092 - _globals['_GETTREERESPONSE']._serialized_start=8094 - _globals['_GETTREERESPONSE']._serialized_end=8201 - _globals['_GETCAPABILITIESREQUEST']._serialized_start=8203 - _globals['_GETCAPABILITIESREQUEST']._serialized_end=8250 - _globals['_SERVERCAPABILITIES']._serialized_start=8253 - _globals['_SERVERCAPABILITIES']._serialized_end=8608 - _globals['_DIGESTFUNCTION']._serialized_start=8611 - _globals['_DIGESTFUNCTION']._serialized_end=8754 - _globals['_DIGESTFUNCTION_VALUE']._serialized_start=8629 - _globals['_DIGESTFUNCTION_VALUE']._serialized_end=8754 - _globals['_ACTIONCACHEUPDATECAPABILITIES']._serialized_start=8756 - _globals['_ACTIONCACHEUPDATECAPABILITIES']._serialized_end=8811 - _globals['_PRIORITYCAPABILITIES']._serialized_start=8814 - _globals['_PRIORITYCAPABILITIES']._serialized_end=8986 - _globals['_PRIORITYCAPABILITIES_PRIORITYRANGE']._serialized_start=8927 - _globals['_PRIORITYCAPABILITIES_PRIORITYRANGE']._serialized_end=8986 - _globals['_SYMLINKABSOLUTEPATHSTRATEGY']._serialized_start=8988 - _globals['_SYMLINKABSOLUTEPATHSTRATEGY']._serialized_end=9068 - _globals['_SYMLINKABSOLUTEPATHSTRATEGY_VALUE']._serialized_start=9019 - _globals['_SYMLINKABSOLUTEPATHSTRATEGY_VALUE']._serialized_end=9068 - _globals['_COMPRESSOR']._serialized_start=9070 - _globals['_COMPRESSOR']._serialized_end=9140 - _globals['_COMPRESSOR_VALUE']._serialized_start=9084 - _globals['_COMPRESSOR_VALUE']._serialized_end=9140 - _globals['_CACHECAPABILITIES']._serialized_start=9143 - _globals['_CACHECAPABILITIES']._serialized_end=9762 - _globals['_EXECUTIONCAPABILITIES']._serialized_start=9765 - _globals['_EXECUTIONCAPABILITIES']._serialized_end=10102 - _globals['_TOOLDETAILS']._serialized_start=10104 - _globals['_TOOLDETAILS']._serialized_end=10158 - _globals['_REQUESTMETADATA']._serialized_start=10161 - _globals['_REQUESTMETADATA']._serialized_end=10398 - _globals['_EXECUTION']._serialized_start=10401 - _globals['_EXECUTION']._serialized_end=10714 - _globals['_ACTIONCACHE']._serialized_start=10717 - _globals['_ACTIONCACHE']._serialized_end=11187 - _globals['_CONTENTADDRESSABLESTORAGE']._serialized_start=11190 - _globals['_CONTENTADDRESSABLESTORAGE']._serialized_end=11985 - _globals['_CAPABILITIES']._serialized_start=11988 - _globals['_CAPABILITIES']._serialized_end=12177 + _globals['_ACTIONRESULT']._serialized_end=3758 + _globals['_OUTPUTFILE']._serialized_start=3761 + _globals['_OUTPUTFILE']._serialized_end=3971 + _globals['_TREE']._serialized_start=3973 + _globals['_TREE']._serialized_end=4099 + _globals['_OUTPUTDIRECTORY']._serialized_start=4102 + _globals['_OUTPUTDIRECTORY']._serialized_end=4306 + _globals['_OUTPUTSYMLINK']._serialized_start=4308 + _globals['_OUTPUTSYMLINK']._serialized_end=4433 + _globals['_EXECUTIONPOLICY']._serialized_start=4435 + _globals['_EXECUTIONPOLICY']._serialized_end=4470 + _globals['_RESULTSCACHEPOLICY']._serialized_start=4472 + _globals['_RESULTSCACHEPOLICY']._serialized_end=4510 + _globals['_EXECUTEREQUEST']._serialized_start=4513 + _globals['_EXECUTEREQUEST']._serialized_end=4900 + _globals['_LOGFILE']._serialized_start=4902 + _globals['_LOGFILE']._serialized_end=4992 + _globals['_EXECUTERESPONSE']._serialized_start=4995 + _globals['_EXECUTERESPONSE']._serialized_end=5331 + _globals['_EXECUTERESPONSE_SERVERLOGSENTRY']._serialized_start=5240 + _globals['_EXECUTERESPONSE_SERVERLOGSENTRY']._serialized_end=5331 + _globals['_EXECUTIONSTAGE']._serialized_start=5333 + _globals['_EXECUTIONSTAGE']._serialized_end=5430 + _globals['_EXECUTIONSTAGE_VALUE']._serialized_start=5351 + _globals['_EXECUTIONSTAGE_VALUE']._serialized_end=5430 + _globals['_EXECUTEOPERATIONMETADATA']._serialized_start=5433 + _globals['_EXECUTEOPERATIONMETADATA']._serialized_end=5742 + _globals['_WAITEXECUTIONREQUEST']._serialized_start=5744 + _globals['_WAITEXECUTIONREQUEST']._serialized_end=5780 + _globals['_GETACTIONRESULTREQUEST']._serialized_start=5783 + _globals['_GETACTIONRESULTREQUEST']._serialized_end=6049 + _globals['_UPDATEACTIONRESULTREQUEST']._serialized_start=6052 + _globals['_UPDATEACTIONRESULTREQUEST']._serialized_end=6399 + _globals['_FINDMISSINGBLOBSREQUEST']._serialized_start=6402 + _globals['_FINDMISSINGBLOBSREQUEST']._serialized_end=6593 + _globals['_FINDMISSINGBLOBSRESPONSE']._serialized_start=6595 + _globals['_FINDMISSINGBLOBSRESPONSE']._serialized_end=6692 + _globals['_BATCHUPDATEBLOBSREQUEST']._serialized_start=6695 + _globals['_BATCHUPDATEBLOBSREQUEST']._serialized_end=7061 + _globals['_BATCHUPDATEBLOBSREQUEST_REQUEST']._serialized_start=6910 + _globals['_BATCHUPDATEBLOBSREQUEST_REQUEST']._serialized_end=7061 + _globals['_BATCHUPDATEBLOBSRESPONSE']._serialized_start=7064 + _globals['_BATCHUPDATEBLOBSRESPONSE']._serialized_end=7282 + _globals['_BATCHUPDATEBLOBSRESPONSE_RESPONSE']._serialized_start=7179 + _globals['_BATCHUPDATEBLOBSRESPONSE_RESPONSE']._serialized_end=7282 + _globals['_BATCHREADBLOBSREQUEST']._serialized_start=7285 + _globals['_BATCHREADBLOBSREQUEST']._serialized_end=7552 + _globals['_BATCHREADBLOBSRESPONSE']._serialized_start=7555 + _globals['_BATCHREADBLOBSRESPONSE']._serialized_end=7855 + _globals['_BATCHREADBLOBSRESPONSE_RESPONSE']._serialized_start=7667 + _globals['_BATCHREADBLOBSRESPONSE_RESPONSE']._serialized_end=7855 + _globals['_GETTREEREQUEST']._serialized_start=7858 + _globals['_GETTREEREQUEST']._serialized_end=8078 + _globals['_GETTREERESPONSE']._serialized_start=8080 + _globals['_GETTREERESPONSE']._serialized_end=8187 + _globals['_GETCAPABILITIESREQUEST']._serialized_start=8189 + _globals['_GETCAPABILITIESREQUEST']._serialized_end=8236 + _globals['_SERVERCAPABILITIES']._serialized_start=8239 + _globals['_SERVERCAPABILITIES']._serialized_end=8594 + _globals['_DIGESTFUNCTION']._serialized_start=8597 + _globals['_DIGESTFUNCTION']._serialized_end=8740 + _globals['_DIGESTFUNCTION_VALUE']._serialized_start=8615 + _globals['_DIGESTFUNCTION_VALUE']._serialized_end=8740 + _globals['_ACTIONCACHEUPDATECAPABILITIES']._serialized_start=8742 + _globals['_ACTIONCACHEUPDATECAPABILITIES']._serialized_end=8797 + _globals['_PRIORITYCAPABILITIES']._serialized_start=8800 + _globals['_PRIORITYCAPABILITIES']._serialized_end=8972 + _globals['_PRIORITYCAPABILITIES_PRIORITYRANGE']._serialized_start=8913 + _globals['_PRIORITYCAPABILITIES_PRIORITYRANGE']._serialized_end=8972 + _globals['_SYMLINKABSOLUTEPATHSTRATEGY']._serialized_start=8974 + _globals['_SYMLINKABSOLUTEPATHSTRATEGY']._serialized_end=9054 + _globals['_SYMLINKABSOLUTEPATHSTRATEGY_VALUE']._serialized_start=9005 + _globals['_SYMLINKABSOLUTEPATHSTRATEGY_VALUE']._serialized_end=9054 + _globals['_COMPRESSOR']._serialized_start=9056 + _globals['_COMPRESSOR']._serialized_end=9126 + _globals['_COMPRESSOR_VALUE']._serialized_start=9070 + _globals['_COMPRESSOR_VALUE']._serialized_end=9126 + _globals['_CACHECAPABILITIES']._serialized_start=9129 + _globals['_CACHECAPABILITIES']._serialized_end=9748 + _globals['_EXECUTIONCAPABILITIES']._serialized_start=9751 + _globals['_EXECUTIONCAPABILITIES']._serialized_end=10088 + _globals['_TOOLDETAILS']._serialized_start=10090 + _globals['_TOOLDETAILS']._serialized_end=10144 + _globals['_REQUESTMETADATA']._serialized_start=10147 + _globals['_REQUESTMETADATA']._serialized_end=10384 + _globals['_EXECUTION']._serialized_start=10387 + _globals['_EXECUTION']._serialized_end=10700 + _globals['_ACTIONCACHE']._serialized_start=10703 + _globals['_ACTIONCACHE']._serialized_end=11173 + _globals['_CONTENTADDRESSABLESTORAGE']._serialized_start=11176 + _globals['_CONTENTADDRESSABLESTORAGE']._serialized_end=11971 + _globals['_CAPABILITIES']._serialized_start=11974 + _globals['_CAPABILITIES']._serialized_end=12163 # @@protoc_insertion_point(module_scope) diff --git a/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.pyi b/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.pyi index 14badbac9..99a57f913 100644 --- a/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.pyi +++ b/src/buildstream/_protos/build/bazel/remote/execution/v2/remote_execution_pb2.pyi @@ -177,7 +177,7 @@ class ExecutedActionMetadata(_message.Message): def __init__(self, worker: _Optional[str] = ..., queued_timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., worker_start_timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., worker_completed_timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., input_fetch_start_timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., input_fetch_completed_timestamp: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ..., ex [...] class ActionResult(_message.Message): - __slots__ = ("output_files", "output_file_symlinks", "output_symlinks", "output_directories", "output_directory_symlinks", "exit_code", "stdout_raw", "stdout_digest", "stderr_raw", "stderr_digest", "execution_metadata") + __slots__ = ("output_files", "output_file_symlinks", "output_symlinks", "output_directories", "output_directory_symlinks", "exit_code", "stdout_raw", "stdout_digest", "stderr_raw", "stderr_digest", "execution_metadata", "subactions") OUTPUT_FILES_FIELD_NUMBER: _ClassVar[int] OUTPUT_FILE_SYMLINKS_FIELD_NUMBER: _ClassVar[int] OUTPUT_SYMLINKS_FIELD_NUMBER: _ClassVar[int] @@ -189,6 +189,7 @@ class ActionResult(_message.Message): STDERR_RAW_FIELD_NUMBER: _ClassVar[int] STDERR_DIGEST_FIELD_NUMBER: _ClassVar[int] EXECUTION_METADATA_FIELD_NUMBER: _ClassVar[int] + SUBACTIONS_FIELD_NUMBER: _ClassVar[int] output_files: _containers.RepeatedCompositeFieldContainer[OutputFile] output_file_symlinks: _containers.RepeatedCompositeFieldContainer[OutputSymlink] output_symlinks: _containers.RepeatedCompositeFieldContainer[OutputSymlink] @@ -200,7 +201,8 @@ class ActionResult(_message.Message): stderr_raw: bytes stderr_digest: Digest execution_metadata: ExecutedActionMetadata - def __init__(self, output_files: _Optional[_Iterable[_Union[OutputFile, _Mapping]]] = ..., output_file_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., output_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., output_directories: _Optional[_Iterable[_Union[OutputDirectory, _Mapping]]] = ..., output_directory_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., exit_code: _Optional[int] = ..., stdout_raw: _Optional[bytes] = . [...] + subactions: _containers.RepeatedCompositeFieldContainer[Digest] + def __init__(self, output_files: _Optional[_Iterable[_Union[OutputFile, _Mapping]]] = ..., output_file_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., output_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., output_directories: _Optional[_Iterable[_Union[OutputDirectory, _Mapping]]] = ..., output_directory_symlinks: _Optional[_Iterable[_Union[OutputSymlink, _Mapping]]] = ..., exit_code: _Optional[int] = ..., stdout_raw: _Optional[bytes] = . [...] class OutputFile(_message.Message): __slots__ = ("path", "digest", "is_executable", "contents", "node_properties") @@ -259,26 +261,20 @@ class ResultsCachePolicy(_message.Message): def __init__(self, priority: _Optional[int] = ...) -> None: ... class ExecuteRequest(_message.Message): - __slots__ = ("instance_name", "skip_cache_lookup", "action_digest", "execution_policy", "results_cache_policy", "digest_function", "inline_stdout", "inline_stderr", "inline_output_files") + __slots__ = ("instance_name", "skip_cache_lookup", "action_digest", "execution_policy", "results_cache_policy", "digest_function") INSTANCE_NAME_FIELD_NUMBER: _ClassVar[int] SKIP_CACHE_LOOKUP_FIELD_NUMBER: _ClassVar[int] ACTION_DIGEST_FIELD_NUMBER: _ClassVar[int] EXECUTION_POLICY_FIELD_NUMBER: _ClassVar[int] RESULTS_CACHE_POLICY_FIELD_NUMBER: _ClassVar[int] DIGEST_FUNCTION_FIELD_NUMBER: _ClassVar[int] - INLINE_STDOUT_FIELD_NUMBER: _ClassVar[int] - INLINE_STDERR_FIELD_NUMBER: _ClassVar[int] - INLINE_OUTPUT_FILES_FIELD_NUMBER: _ClassVar[int] instance_name: str skip_cache_lookup: bool action_digest: Digest execution_policy: ExecutionPolicy results_cache_policy: ResultsCachePolicy digest_function: DigestFunction.Value - inline_stdout: bool - inline_stderr: bool - inline_output_files: _containers.RepeatedScalarFieldContainer[str] - def __init__(self, instance_name: _Optional[str] = ..., skip_cache_lookup: bool = ..., action_digest: _Optional[_Union[Digest, _Mapping]] = ..., execution_policy: _Optional[_Union[ExecutionPolicy, _Mapping]] = ..., results_cache_policy: _Optional[_Union[ResultsCachePolicy, _Mapping]] = ..., digest_function: _Optional[_Union[DigestFunction.Value, str]] = ..., inline_stdout: bool = ..., inline_stderr: bool = ..., inline_output_files: _Optional[_Iterable[str]] = ...) -> None: ... + def __init__(self, instance_name: _Optional[str] = ..., skip_cache_lookup: bool = ..., action_digest: _Optional[_Union[Digest, _Mapping]] = ..., execution_policy: _Optional[_Union[ExecutionPolicy, _Mapping]] = ..., results_cache_policy: _Optional[_Union[ResultsCachePolicy, _Mapping]] = ..., digest_function: _Optional[_Union[DigestFunction.Value, str]] = ...) -> None: ... class LogFile(_message.Message): __slots__ = ("digest", "human_readable") diff --git a/src/buildstream/_protos/buildstream/v2/artifact.proto b/src/buildstream/_protos/buildstream/v2/artifact.proto index 28d006f0f..57628faa7 100644 --- a/src/buildstream/_protos/buildstream/v2/artifact.proto +++ b/src/buildstream/_protos/buildstream/v2/artifact.proto @@ -93,4 +93,7 @@ message Artifact { repeated string marked_directories = 4; }; SandboxState buildsandbox = 18; // optional + + // digest of a SpeculativeActions message (from speculative_actions.proto) + build.bazel.remote.execution.v2.Digest speculative_actions = 19; // optional } diff --git a/src/buildstream/_protos/buildstream/v2/artifact_pb2.py b/src/buildstream/_protos/buildstream/v2/artifact_pb2.py index a81006af5..757781a18 100644 --- a/src/buildstream/_protos/buildstream/v2/artifact_pb2.py +++ b/src/buildstream/_protos/buildstream/v2/artifact_pb2.py @@ -26,7 +26,7 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution from buildstream._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1d\x62uildstream/v2/artifact.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\"\xa6\t\n\x08\x41rtifact\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x15\n\rbuild_success\x18\x02 \x01(\x08\x12\x13\n\x0b\x62uild_error\x18\x03 \x01(\t\x12\x1b\n\x13\x62uild_error_details\x18\x04 \x01(\t\x12\x12\n\nstrong_key\x18\x05 \x01(\t\x12\x10\n\x08weak_key\x18\ [...] +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1d\x62uildstream/v2/artifact.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\"\xec\t\n\x08\x41rtifact\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x15\n\rbuild_success\x18\x02 \x01(\x08\x12\x13\n\x0b\x62uild_error\x18\x03 \x01(\t\x12\x1b\n\x13\x62uild_error_details\x18\x04 \x01(\t\x12\x12\n\nstrong_key\x18\x05 \x01(\t\x12\x10\n\x08weak_key\x18\ [...] _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,11 +34,11 @@ _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'buildstream.v2.artifact_pb2 if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None _globals['_ARTIFACT']._serialized_start=136 - _globals['_ARTIFACT']._serialized_end=1326 - _globals['_ARTIFACT_DEPENDENCY']._serialized_start=921 - _globals['_ARTIFACT_DEPENDENCY']._serialized_end=1020 - _globals['_ARTIFACT_LOGFILE']._serialized_start=1022 - _globals['_ARTIFACT_LOGFILE']._serialized_end=1102 - _globals['_ARTIFACT_SANDBOXSTATE']._serialized_start=1105 - _globals['_ARTIFACT_SANDBOXSTATE']._serialized_end=1326 + _globals['_ARTIFACT']._serialized_end=1396 + _globals['_ARTIFACT_DEPENDENCY']._serialized_start=991 + _globals['_ARTIFACT_DEPENDENCY']._serialized_end=1090 + _globals['_ARTIFACT_LOGFILE']._serialized_start=1092 + _globals['_ARTIFACT_LOGFILE']._serialized_end=1172 + _globals['_ARTIFACT_SANDBOXSTATE']._serialized_start=1175 + _globals['_ARTIFACT_SANDBOXSTATE']._serialized_end=1396 # @@protoc_insertion_point(module_scope) diff --git a/src/buildstream/_protos/buildstream/v2/artifact_pb2.pyi b/src/buildstream/_protos/buildstream/v2/artifact_pb2.pyi index 7eb4d7550..7681772b2 100644 --- a/src/buildstream/_protos/buildstream/v2/artifact_pb2.pyi +++ b/src/buildstream/_protos/buildstream/v2/artifact_pb2.pyi @@ -8,7 +8,7 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map DESCRIPTOR: _descriptor.FileDescriptor class Artifact(_message.Message): - __slots__ = ("version", "build_success", "build_error", "build_error_details", "strong_key", "weak_key", "was_workspaced", "files", "build_deps", "public_data", "logs", "buildtree", "sources", "low_diversity_meta", "high_diversity_meta", "strict_key", "buildroot", "buildsandbox") + __slots__ = ("version", "build_success", "build_error", "build_error_details", "strong_key", "weak_key", "was_workspaced", "files", "build_deps", "public_data", "logs", "buildtree", "sources", "low_diversity_meta", "high_diversity_meta", "strict_key", "buildroot", "buildsandbox", "speculative_actions") class Dependency(_message.Message): __slots__ = ("project_name", "element_name", "cache_key", "was_workspaced") PROJECT_NAME_FIELD_NUMBER: _ClassVar[int] @@ -56,6 +56,7 @@ class Artifact(_message.Message): STRICT_KEY_FIELD_NUMBER: _ClassVar[int] BUILDROOT_FIELD_NUMBER: _ClassVar[int] BUILDSANDBOX_FIELD_NUMBER: _ClassVar[int] + SPECULATIVE_ACTIONS_FIELD_NUMBER: _ClassVar[int] version: int build_success: bool build_error: str @@ -74,4 +75,5 @@ class Artifact(_message.Message): strict_key: str buildroot: _remote_execution_pb2.Digest buildsandbox: Artifact.SandboxState - def __init__(self, version: _Optional[int] = ..., build_success: bool = ..., build_error: _Optional[str] = ..., build_error_details: _Optional[str] = ..., strong_key: _Optional[str] = ..., weak_key: _Optional[str] = ..., was_workspaced: bool = ..., files: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ..., build_deps: _Optional[_Iterable[_Union[Artifact.Dependency, _Mapping]]] = ..., public_data: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ..., logs: _Opt [...] + speculative_actions: _remote_execution_pb2.Digest + def __init__(self, version: _Optional[int] = ..., build_success: bool = ..., build_error: _Optional[str] = ..., build_error_details: _Optional[str] = ..., strong_key: _Optional[str] = ..., weak_key: _Optional[str] = ..., was_workspaced: bool = ..., files: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ..., build_deps: _Optional[_Iterable[_Union[Artifact.Dependency, _Mapping]]] = ..., public_data: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ..., logs: _Opt [...] diff --git a/src/buildstream/_protos/buildstream/v2/speculative_actions.proto b/src/buildstream/_protos/buildstream/v2/speculative_actions.proto new file mode 100644 index 000000000..aea6d4f7b --- /dev/null +++ b/src/buildstream/_protos/buildstream/v2/speculative_actions.proto @@ -0,0 +1,62 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package buildstream.v2; + +import "build/bazel/remote/execution/v2/remote_execution.proto"; + +// SpeculativeActions: Metadata for cache priming via speculative execution +// +// This message stores overlay information that allows BuildStream to: +// 1. Instantiate previously-recorded Actions with current dependency versions +// 2. Submit these adapted Actions to prime the Remote Execution ActionCache +// 3. Speed up builds when dependencies change but not the element itself +message SpeculativeActions { + // Speculative actions for this element's build + repeated SpeculativeAction actions = 1; + + // Overlays mapping artifact file digests to their sources + // Enables downstream elements to resolve dependencies without fetching sources + repeated Overlay artifact_overlays = 2; + + message SpeculativeAction { + // Original action digest from the recorded build + build.bazel.remote.execution.v2.Digest base_action_digest = 1; + + // Overlays to apply when instantiating this action + repeated Overlay overlays = 2; + } + + message Overlay { + enum OverlayType { + SOURCE = 0; // From element's source tree + ARTIFACT = 1; // From dependency element's artifact output + } + + OverlayType type = 1; + + // Element name providing the source + // Empty string means the element itself (self-reference) + string source_element = 2; + + // Path within source (source tree or artifact) + string source_path = 4; + + // The digest that should be replaced in the action's input tree + // When instantiating, find all occurrences of this digest and replace + // with the current digest of the file at source_path + build.bazel.remote.execution.v2.Digest target_digest = 5; + } +} diff --git a/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.py b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.py new file mode 100644 index 000000000..e9233c7da --- /dev/null +++ b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: buildstream/v2/speculative_actions.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'buildstream/v2/speculative_actions.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n(buildstream/v2/speculative_actions.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\"\xa3\x04\n\x12SpeculativeActions\x12\x45\n\x07\x61\x63tions\x18\x01 \x03(\x0b\x32\x34.buildstream.v2.SpeculativeActions.SpeculativeAction\x12\x45\n\x11\x61rtifact_overlays\x18\x02 \x03(\x0b\x32*.buildstream.v2.SpeculativeActions.Overlay\x1a\x96\x01\n\x11SpeculativeAction\x12\x43\n\x12\x62\x61se_a [...] + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'buildstream.v2.speculative_actions_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_SPECULATIVEACTIONS']._serialized_start=117 + _globals['_SPECULATIVEACTIONS']._serialized_end=664 + _globals['_SPECULATIVEACTIONS_SPECULATIVEACTION']._serialized_start=282 + _globals['_SPECULATIVEACTIONS_SPECULATIVEACTION']._serialized_end=432 + _globals['_SPECULATIVEACTIONS_OVERLAY']._serialized_start=435 + _globals['_SPECULATIVEACTIONS_OVERLAY']._serialized_end=664 + _globals['_SPECULATIVEACTIONS_OVERLAY_OVERLAYTYPE']._serialized_start=625 + _globals['_SPECULATIVEACTIONS_OVERLAY_OVERLAYTYPE']._serialized_end=664 +# @@protoc_insertion_point(module_scope) diff --git a/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.pyi b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.pyi new file mode 100644 index 000000000..a9dff52dc --- /dev/null +++ b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2.pyi @@ -0,0 +1,40 @@ +from build.bazel.remote.execution.v2 import remote_execution_pb2 as _remote_execution_pb2 +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class SpeculativeActions(_message.Message): + __slots__ = ("actions", "artifact_overlays") + class SpeculativeAction(_message.Message): + __slots__ = ("base_action_digest", "overlays") + BASE_ACTION_DIGEST_FIELD_NUMBER: _ClassVar[int] + OVERLAYS_FIELD_NUMBER: _ClassVar[int] + base_action_digest: _remote_execution_pb2.Digest + overlays: _containers.RepeatedCompositeFieldContainer[SpeculativeActions.Overlay] + def __init__(self, base_action_digest: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ..., overlays: _Optional[_Iterable[_Union[SpeculativeActions.Overlay, _Mapping]]] = ...) -> None: ... + class Overlay(_message.Message): + __slots__ = ("type", "source_element", "source_path", "target_digest") + class OverlayType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + SOURCE: _ClassVar[SpeculativeActions.Overlay.OverlayType] + ARTIFACT: _ClassVar[SpeculativeActions.Overlay.OverlayType] + SOURCE: SpeculativeActions.Overlay.OverlayType + ARTIFACT: SpeculativeActions.Overlay.OverlayType + TYPE_FIELD_NUMBER: _ClassVar[int] + SOURCE_ELEMENT_FIELD_NUMBER: _ClassVar[int] + SOURCE_PATH_FIELD_NUMBER: _ClassVar[int] + TARGET_DIGEST_FIELD_NUMBER: _ClassVar[int] + type: SpeculativeActions.Overlay.OverlayType + source_element: str + source_path: str + target_digest: _remote_execution_pb2.Digest + def __init__(self, type: _Optional[_Union[SpeculativeActions.Overlay.OverlayType, str]] = ..., source_element: _Optional[str] = ..., source_path: _Optional[str] = ..., target_digest: _Optional[_Union[_remote_execution_pb2.Digest, _Mapping]] = ...) -> None: ... + ACTIONS_FIELD_NUMBER: _ClassVar[int] + ARTIFACT_OVERLAYS_FIELD_NUMBER: _ClassVar[int] + actions: _containers.RepeatedCompositeFieldContainer[SpeculativeActions.SpeculativeAction] + artifact_overlays: _containers.RepeatedCompositeFieldContainer[SpeculativeActions.Overlay] + def __init__(self, actions: _Optional[_Iterable[_Union[SpeculativeActions.SpeculativeAction, _Mapping]]] = ..., artifact_overlays: _Optional[_Iterable[_Union[SpeculativeActions.Overlay, _Mapping]]] = ...) -> None: ... diff --git a/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2_grpc.py b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2_grpc.py new file mode 100644 index 000000000..7c3546642 --- /dev/null +++ b/src/buildstream/_protos/buildstream/v2/speculative_actions_pb2_grpc.py @@ -0,0 +1,24 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + + +GRPC_GENERATED_VERSION = '1.69.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in buildstream/v2/speculative_actions_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) diff --git a/src/buildstream/_speculative_actions/__init__.py b/src/buildstream/_speculative_actions/__init__.py new file mode 100644 index 000000000..a94e55216 --- /dev/null +++ b/src/buildstream/_speculative_actions/__init__.py @@ -0,0 +1,30 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Speculative Actions - Cache Priming Infrastructure +=================================================== + +This module implements the Speculative Actions feature for BuildStream, +which enables predictive cache priming by recording and replaying compiler +invocations with updated dependency versions. + +Key Components: +- generator: Generates SpeculativeActions and artifact overlays after builds +- instantiator: Applies overlays to instantiate actions before builds +""" + +from .generator import SpeculativeActionsGenerator +from .instantiator import SpeculativeActionInstantiator + +__all__ = ["SpeculativeActionsGenerator", "SpeculativeActionInstantiator"] diff --git a/src/buildstream/_speculative_actions/generator.py b/src/buildstream/_speculative_actions/generator.py new file mode 100644 index 000000000..aaf326fd2 --- /dev/null +++ b/src/buildstream/_speculative_actions/generator.py @@ -0,0 +1,392 @@ +# +# Copyright 2025 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SpeculativeActionsGenerator +============================ + +Generates SpeculativeActions and artifact overlays after element builds. + +This module is responsible for: +1. Extracting subaction digests from ActionResult +2. Traversing action input trees to find all file digests +3. Resolving digests to their source elements (SOURCE > ARTIFACT priority) +4. Creating overlays for each digest +5. Generating artifact_overlays for the element's output files +""" + +from typing import Dict, Tuple + + +class SpeculativeActionsGenerator: + """ + Generates SpeculativeActions from element builds. + + This class analyzes completed element builds to extract subactions and + generate overlay metadata that describes how to adapt inputs for future + builds. + """ + + def __init__(self, cas): + """ + Initialize the generator. + + Args: + cas: The CAS cache for fetching actions and directories + """ + self._cas = cas + # Cache for digest.hash -> (element, path, type) lookups + self._digest_cache: Dict[str, Tuple[str, str, str]] = {} + + def generate_speculative_actions(self, element, subaction_digests, dependencies): + """ + Generate SpeculativeActions for an element build. + + This is the main entry point for overlay generation. It processes + all subactions from the element's build and generates overlays + for each. + + Args: + element: The element that was built + subaction_digests: List of Action digests from the build (from ActionResult.subactions) + dependencies: List of dependency elements (for resolving artifact overlays) + + Returns: + A SpeculativeActions message containing: + - actions: SpeculativeActions with overlays for each subaction + - artifact_overlays: Overlays mapping artifact file digests to sources + """ + from .._protos.buildstream.v2 import speculative_actions_pb2 + + spec_actions = speculative_actions_pb2.SpeculativeActions() + + # Build digest lookup tables from element sources and dependencies + self._build_digest_cache(element, dependencies) + + # Generate overlays for each subaction + for subaction_digest in subaction_digests: + spec_action = self._generate_action_overlays(element, subaction_digest) + if spec_action: + spec_actions.actions.append(spec_action) + + # Generate artifact overlays for the element's output files + artifact_overlays = self._generate_artifact_overlays(element) + spec_actions.artifact_overlays.extend(artifact_overlays) + + return spec_actions + + def _build_digest_cache(self, element, dependencies): + """ + Build a cache mapping file digests to their source elements. + + Args: + element: The element being processed + dependencies: List of dependency elements + """ + self._digest_cache.clear() + + # Index element's own sources (highest priority) + self._index_element_sources(element, element) + + # Index dependency artifacts (lower priority) + for dep in dependencies: + self._index_element_artifact(dep) + + def _index_element_sources(self, element, source_element): + """ + Index all file digests in an element's source tree. + + Args: + element: The element whose sources to index + source_element: The element to record as the source + """ + # Get the element's source directory + try: + # Check if element has any sources + if not any(element.sources()): + return + + # Access the private __sources attribute to get ElementSources + sources = element._Element__sources + if not sources or not sources.cached(): + return + + source_dir = sources.get_files() + if not source_dir: + return + + # Traverse the source directory and index all files with full paths + self._traverse_directory_with_paths( + source_dir._get_digest(), source_element.name, "SOURCE", "" # Start with empty path + ) + except Exception as e: + # Gracefully handle missing sources + pass + + def _index_element_artifact(self, element): + """ + Index all file digests in an element's artifact output. + + Args: + element: The element whose artifact to index + """ + try: + # Check if element is cached + if not element._cached(): + return + + # Get the artifact object + artifact = element._get_artifact() + if not artifact or not artifact.cached(): + return + + # Get the artifact files directory + files_dir = artifact.get_files() + if not files_dir: + return + + # Traverse the artifact files directory with full paths + self._traverse_directory_with_paths( + files_dir._get_digest(), element.name, "ARTIFACT", "" # Start with empty path + ) + except Exception as e: + # Gracefully handle missing artifacts + pass + + def _traverse_directory_with_paths(self, directory_digest, element_name, overlay_type, current_path): + """ + Recursively traverse a Directory tree and index all file digests with full paths. + + Args: + directory_digest: The Directory digest to traverse + element_name: The element name to associate with found files + overlay_type: Either "SOURCE" or "ARTIFACT" + current_path: Current relative path from root (e.g., "src/foo") + """ + try: + directory = self._cas.fetch_directory_proto(directory_digest) + if not directory: + return + + # Index all files in this directory with full paths + for file_node in directory.files: + digest_hash = file_node.digest.hash + # Build full relative path + file_path = file_node.name if not current_path else f"{current_path}/{file_node.name}" + + # Priority: SOURCE > ARTIFACT + # Only store if not already present, or if upgrading from ARTIFACT to SOURCE + if digest_hash not in self._digest_cache: + self._digest_cache[digest_hash] = (element_name, file_path, overlay_type) + elif overlay_type == "SOURCE" and self._digest_cache[digest_hash][2] == "ARTIFACT": + # Upgrade ARTIFACT to SOURCE + self._digest_cache[digest_hash] = (element_name, file_path, overlay_type) + + # Recursively traverse subdirectories + for dir_node in directory.directories: + # Build path for subdirectory + subdir_path = dir_node.name if not current_path else f"{current_path}/{dir_node.name}" + self._traverse_directory_with_paths(dir_node.digest, element_name, overlay_type, subdir_path) + except Exception as e: + # Gracefully handle errors + pass + + def _generate_action_overlays(self, element, action_digest): + """ + Generate overlays for a single subaction. + + Args: + element: The element being processed + action_digest: The Action digest to generate overlays for + + Returns: + SpeculativeAction proto or None if action not found + """ + from .._protos.buildstream.v2 import speculative_actions_pb2 + + # Fetch the action from CAS + action = self._cas.fetch_action(action_digest) + if not action: + return None + + spec_action = speculative_actions_pb2.SpeculativeActions.SpeculativeAction() + spec_action.base_action_digest.CopyFrom(action_digest) + + # Extract all file digests from the action's input tree + input_digests = self._extract_digests_from_action(action) + + # Resolve each digest to an overlay + for digest in input_digests: + overlay = self._resolve_digest_to_overlay(digest, element) + if overlay: + spec_action.overlays.append(overlay) + + return spec_action if spec_action.overlays else None + + def _extract_digests_from_action(self, action): + """ + Extract all unique file digests from an Action's input tree. + + Args: + action: Action proto + + Returns: + Set of file digests (as Digest protos) + """ + digests = set() + + if not action.HasField("input_root_digest"): + return digests + + # Traverse the input root directory tree + self._collect_file_digests(action.input_root_digest, digests) + + return digests + + def _collect_file_digests(self, directory_digest, digests_set): + """ + Recursively collect all file digests from a directory tree. + + Args: + directory_digest: Directory digest to traverse + digests_set: Set to add found digests to + """ + try: + directory = self._cas.fetch_directory_proto(directory_digest) + if not directory: + return + + # Collect file digests + for file_node in directory.files: + # Store the digest as a tuple (hash, size) for set uniqueness + digests_set.add((file_node.digest.hash, file_node.digest.size_bytes)) + + # Recursively traverse subdirectories + for dir_node in directory.directories: + self._collect_file_digests(dir_node.digest, digests_set) + except: + pass + + def _resolve_digest_to_overlay(self, digest_tuple, element, artifact_file_path=None): + """ + Resolve a file digest to an Overlay proto. + + Args: + digest_tuple: Tuple of (hash, size_bytes) + element: The element being processed + artifact_file_path: Path in artifact (used for artifact_overlays), can differ from source_path + + Returns: + Overlay proto or None if digest cannot be resolved + """ + from .._protos.buildstream.v2 import speculative_actions_pb2 + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + digest_hash = digest_tuple[0] + digest_size = digest_tuple[1] + + # Look up in our digest cache + if digest_hash not in self._digest_cache: + return None + + element_name, file_path, overlay_type = self._digest_cache[digest_hash] + + # Create overlay + overlay = speculative_actions_pb2.SpeculativeActions.Overlay() + overlay.target_digest.hash = digest_hash + overlay.target_digest.size_bytes = digest_size + overlay.source_path = file_path # Path in the source/artifact where it originated + + if overlay_type == "SOURCE": + overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE + # Empty string means self-reference for this element + overlay.source_element = "" if element_name == element.name else element_name + elif overlay_type == "ARTIFACT": + overlay.type = speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT + overlay.source_element = element_name + else: + return None + + return overlay + + def _generate_artifact_overlays(self, element): + """ + Generate artifact_overlays for the element's output files. + + This creates a mapping from artifact file digests back to their + sources, enabling downstream elements to trace dependencies. + + Args: + element: The element with the artifact + + Returns: + List of Overlay protos + """ + overlays = [] + + try: + # Check if element is cached + if not element._cached(): + return overlays + + # Get the artifact object + artifact = element._get_artifact() + if not artifact or not artifact.cached(): + return overlays + + # Get the artifact files directory + files_dir = artifact.get_files() + if not files_dir: + return overlays + + # Traverse artifact files and create overlays for each + self._generate_overlays_for_directory( + files_dir._get_digest(), element, overlays, "" # Start with empty path + ) + except Exception as e: + pass + + return overlays + + def _generate_overlays_for_directory(self, directory_digest, element, overlays, current_path): + """ + Recursively generate overlays for files in a directory. + + Args: + directory_digest: Directory to process + element: The element being processed + overlays: List to append overlays to + current_path: Current relative path from root + """ + try: + directory = self._cas.fetch_directory_proto(directory_digest) + if not directory: + return + + # Process each file with full path + for file_node in directory.files: + file_path = file_node.name if not current_path else f"{current_path}/{file_node.name}" + overlay = self._resolve_digest_to_overlay( + (file_node.digest.hash, file_node.digest.size_bytes), element, file_path + ) + if overlay: + overlays.append(overlay) + + # Recursively process subdirectories + for dir_node in directory.directories: + subdir_path = dir_node.name if not current_path else f"{current_path}/{dir_node.name}" + self._generate_overlays_for_directory(dir_node.digest, element, overlays, subdir_path) + except Exception as e: + pass diff --git a/src/buildstream/_speculative_actions/instantiator.py b/src/buildstream/_speculative_actions/instantiator.py new file mode 100644 index 000000000..45c907199 --- /dev/null +++ b/src/buildstream/_speculative_actions/instantiator.py @@ -0,0 +1,393 @@ +# +# Copyright 2025 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SpeculativeActionInstantiator +============================== + +Instantiates SpeculativeActions by applying overlays. + +This module is responsible for: +1. Fetching base actions from CAS +2. Applying SOURCE and ARTIFACT overlays +3. Replacing file digests in action input trees +4. Storing modified actions back to CAS +""" + + + + +class SpeculativeActionInstantiator: + """ + Instantiate SpeculativeActions by applying overlays. + + This class takes speculative actions and adapts them to current + dependency versions by replacing file digests according to overlays. + """ + + def __init__(self, cas, artifactcache): + """ + Initialize the instantiator. + + Args: + cas: The CAS cache + artifactcache: The artifact cache + """ + self._cas = cas + self._artifactcache = artifactcache + + def instantiate_action(self, spec_action, element, element_lookup): + """ + Instantiate a SpeculativeAction by applying overlays. + + Args: + spec_action: SpeculativeAction proto + element: Element being primed + element_lookup: Dict mapping element names to Element objects + + Returns: + Digest of instantiated action, or None if overlays cannot be applied + """ + # Fetch the base action + base_action = self._cas.fetch_action(spec_action.base_action_digest) + if not base_action: + return None + + # Get cached build dependency cache keys for optimization + # Skip overlays for dependencies that haven't changed + cached_dep_keys = self._get_cached_dependency_keys(element) + + # Start with a copy of the base action + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + action = remote_execution_pb2.Action() + action.CopyFrom(base_action) + + # Track if we made any modifications + modified = False + digest_replacements = {} # old_hash -> new_digest + skipped_count = 0 + applied_count = 0 + + # Resolve all overlays first + for overlay in spec_action.overlays: + # Optimization: Skip overlays for dependencies with unchanged cache keys + if overlay.source_element and self._should_skip_overlay(overlay, element, cached_dep_keys): + skipped_count += 1 + continue + + replacement = self._resolve_overlay(overlay, element, element_lookup) + if replacement: + # replacement is (old_digest, new_digest) + digest_replacements[replacement[0].hash] = replacement[1] + if replacement[0].hash != replacement[1].hash: + modified = True + applied_count += 1 + + # Log optimization results + if skipped_count > 0: + element.info(f"Skipped {skipped_count} overlays (unchanged dependencies), applied {applied_count}") + + if not modified: + # No changes needed, return base action digest + return spec_action.base_action_digest + + # Apply digest replacements to the action's input tree + if action.HasField("input_root_digest"): + new_root_digest = self._replace_digests_in_tree(action.input_root_digest, digest_replacements) + if new_root_digest: + action.input_root_digest.CopyFrom(new_root_digest) + + # Store the modified action and return its digest + return self._cas.store_action(action) + + def _get_cached_dependency_keys(self, element): + """ + Get cache keys for build dependencies from the cached artifact. + + Args: + element: The element being primed + + Returns: + Dict mapping element_name -> cache_key from artifact.build_deps + """ + dep_keys = {} + + try: + artifact = element._get_artifact() + if not artifact or not artifact.cached(): + return dep_keys + + artifact_proto = artifact._get_proto() + if not artifact_proto: + return dep_keys + + # Extract cache keys from build_deps + for build_dep in artifact_proto.build_deps: + dep_keys[build_dep.element_name] = build_dep.cache_key + + except Exception: + # If we can't get the keys, just continue without optimization + pass + + return dep_keys + + def _should_skip_overlay(self, overlay, element, cached_dep_keys): + """ + Check if an overlay can be skipped because the dependency hasn't changed. + + Args: + overlay: Overlay proto + element: Element being primed + cached_dep_keys: Dict of element_name -> cache_key from cached artifact + + Returns: + bool: True if overlay can be skipped + """ + # Only skip for dependency overlays (source_element is not empty and not self) + if not overlay.source_element or overlay.source_element == element.name: + return False + + # Check if we have a cached key for this dependency + cached_key = cached_dep_keys.get(overlay.source_element) + if not cached_key: + return False + + # Get the current dependency element + from ..types import _Scope + + for dep in element._dependencies(_Scope.BUILD, recurse=False): + if dep.name == overlay.source_element: + current_key = dep._get_cache_key() + # Skip overlay if cache keys match (dependency unchanged) + if current_key == cached_key: + return True + break + + return False + + def _resolve_overlay(self, overlay, element, element_lookup): + """ + Resolve an overlay to get current file digest. + + Args: + overlay: Overlay proto + element: Current element + element_lookup: Dict mapping element names to Element objects + + Returns: + Tuple of (old_digest, new_digest) or None + """ + from .._protos.buildstream.v2 import speculative_actions_pb2 + + if overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.SOURCE: + return self._resolve_source_overlay(overlay, element, element_lookup) + elif overlay.type == speculative_actions_pb2.SpeculativeActions.Overlay.ARTIFACT: + return self._resolve_artifact_overlay(overlay, element, element_lookup) + + return None + + def _resolve_source_overlay(self, overlay, element, element_lookup): + """ + Resolve a SOURCE overlay to get current source file digest. + + Args: + overlay: Overlay proto + element: Current element + element_lookup: Dict mapping element names to Element objects + + Returns: + Tuple of (old_digest, new_digest) or None + """ + # Determine source element (empty = self) + if overlay.source_element == "": + source_element = element + else: + # Look up the source element by name + source_element = element_lookup.get(overlay.source_element) + if not source_element: + return None + + # Get current digest from source files + try: + # Check if element has any sources + if not any(source_element.sources()): + return None + + # Access the private __sources attribute + sources = source_element._Element__sources + if not sources or not sources.cached(): + return None + + source_dir = sources.get_files() + if not source_dir: + return None + + # Find the file in the source tree by full path + current_digest = self._find_file_by_path(source_dir._get_digest(), overlay.source_path) + + if current_digest: + return (overlay.target_digest, current_digest) + except Exception as e: + pass + + return None + + def _resolve_artifact_overlay(self, overlay, element, element_lookup): + """ + Resolve an ARTIFACT overlay to get current artifact file digest. + + Args: + overlay: Overlay proto + element: Current element + element_lookup: Dict mapping element names to Element objects + + Returns: + Tuple of (old_digest, new_digest) or None + """ + # Look up the artifact element + artifact_element = element_lookup.get(overlay.source_element) + if not artifact_element: + return None + + try: + # Check if element is cached + if not artifact_element._cached(): + return None + + # Get the artifact object + artifact = artifact_element._get_artifact() + if not artifact or not artifact.cached(): + return None + + # Get speculative actions to trace back to source + spec_actions = self._artifactcache.get_speculative_actions(artifact) + if spec_actions and spec_actions.artifact_overlays: + # Trace through artifact_overlays to find the ultimate source + for art_overlay in spec_actions.artifact_overlays: + if art_overlay.target_digest.hash == overlay.target_digest.hash: + # Found the mapping - now resolve the source overlay + return self._resolve_overlay(art_overlay, artifact_element, element_lookup) + + # Fallback: directly look up file in artifact + files_dir = artifact.get_files() + if not files_dir: + return None + + current_digest = self._find_file_by_path(files_dir._get_digest(), overlay.source_path) + + if current_digest: + return (overlay.target_digest, current_digest) + + except Exception as e: + pass + + return None + + def _find_file_by_path(self, directory_digest, file_path): + """ + Find a file in a directory tree by full relative path. + + Args: + directory_digest: Directory to search + file_path: Full relative path (e.g., "src/foo/bar.c") + + Returns: + File digest or None + """ + try: + # Split path into components + if not file_path: + return None + + parts = file_path.split("/") + current_digest = directory_digest + + # Navigate through directories + for i, part in enumerate(parts[:-1]): # All but the last (filename) + directory = self._cas.fetch_directory_proto(current_digest) + if not directory: + return None + + # Find the subdirectory + found = False + for dir_node in directory.directories: + if dir_node.name == part: + current_digest = dir_node.digest + found = True + break + + if not found: + return None + + # Now find the file + filename = parts[-1] + directory = self._cas.fetch_directory_proto(current_digest) + if not directory: + return None + + for file_node in directory.files: + if file_node.name == filename: + return file_node.digest + + except Exception as e: + pass + + return None + + def _replace_digests_in_tree(self, directory_digest, replacements): + """ + Replace file digests in a directory tree. + + Args: + directory_digest: Root directory digest + replacements: Dict of old_hash -> new_digest + + Returns: + New directory digest or None + """ + try: + directory = self._cas.fetch_directory_proto(directory_digest) + if not directory: + return None + + from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + + new_directory = remote_execution_pb2.Directory() + new_directory.CopyFrom(directory) + + modified = False + + # Replace file digests + for i, file_node in enumerate(new_directory.files): + if file_node.digest.hash in replacements: + new_directory.files[i].digest.CopyFrom(replacements[file_node.digest.hash]) + modified = True + + # Recursively process subdirectories + for i, dir_node in enumerate(new_directory.directories): + new_subdir_digest = self._replace_digests_in_tree(dir_node.digest, replacements) + if new_subdir_digest and new_subdir_digest.hash != dir_node.digest.hash: + new_directory.directories[i].digest.CopyFrom(new_subdir_digest) + modified = True + + if modified: + # Store the modified directory + return self._cas.store_directory_proto(new_directory) + else: + # No changes, return original + return directory_digest + except: + return None diff --git a/src/buildstream/data/userconfig.yaml b/src/buildstream/data/userconfig.yaml index 76af0d6c8..b510fcd71 100644 --- a/src/buildstream/data/userconfig.yaml +++ b/src/buildstream/data/userconfig.yaml @@ -74,6 +74,12 @@ scheduler: # on-error: quit + # Enable speculative actions for cache priming. + # When enabled, subactions from builds are recorded and used to + # speculatively prime the remote ActionCache in future builds. + # + speculative-actions: False + # # Build related configuration
