This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tristan/sourcecache-retries in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ca48e58fd18d918d5840575b27b01e22878be212 Author: Tristan van Berkom <[email protected]> AuthorDate: Sat May 7 18:50:06 2022 +0900 _sourcecache.py: Refactor to handle AssetCacheError properly This file appears to have fallen out of sync and had incorrect assumptions about the errors which can be raised from AssetRemote.fetch_directory() and AssetCache.push_directory(). This patch also concentrates the `try` blocks around the functions which can raise AssetCacheError. --- src/buildstream/_sourcecache.py | 60 +++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index dc706062f..d67d922b0 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -117,27 +117,22 @@ class SourceCache(AssetCache): # First fetch the source directory digest so we know what to pull source_digest = None for remote in index_remotes: - try: - remote.init() - source.status("Pulling source {} <- {}".format(display_key, remote)) - - source_digest = self._pull_source(ref, remote) - if source_digest is None: - source.info( - "Remote source service ({}) does not have source {} cached".format(remote, display_key) - ) - continue - except CASError as e: - raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e), temporary=True) from e + remote.init() + source.status("Pulling source {} <- {}".format(display_key, remote)) + + source_digest = self._pull_source(ref, remote) + if source_digest is None: + source.info("Remote source service ({}) does not have source {} cached".format(remote, display_key)) + continue if not source_digest: return False for remote in storage_remotes: - try: - remote.init() - source.status("Pulling data for source {} <- {}".format(display_key, remote)) + remote.init() + source.status("Pulling data for source {} <- {}".format(display_key, remote)) + try: # Fetch source blobs self.cas._fetch_directory(remote, source_digest) @@ -230,33 +225,32 @@ class SourceCache(AssetCache): def _pull_source(self, source_ref, remote): uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + remote.init() try: - remote.init() response = remote.fetch_directory([uri]) - if not response: - return None + except AssetCacheError as e: + raise SourceCacheError( + "Failed to pull source with status {}: {}".format(e.code().name, e.details()), temporary=True + ) from e + return None + + if response: self._store_source(source_ref, response.root_directory_digest) return response.root_directory_digest - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SourceCacheError( - "Failed to pull source with status {}: {}".format(e.code().name, e.details()), temporary=True - ) - return None + return None def _push_source(self, source_ref, remote): uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + remote.init() + source_proto = self._get_source(source_ref) try: - remote.init() - source_proto = self._get_source(source_ref) remote.push_directory([uri], source_proto.files) - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise SourceCacheError( - "Failed to push source with status {}: {}".format(e.code().name, e.details()), temporary=True - ) + except AssetCacheError as e: + raise SourceCacheError( + "Failed to push source with status {}: {}".format(e.code().name, e.details()), temporary=True + ) from e return False + + return True
