This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/sourcecache-retries
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ca48e58fd18d918d5840575b27b01e22878be212
Author: Tristan van Berkom <[email protected]>
AuthorDate: Sat May 7 18:50:06 2022 +0900

    _sourcecache.py: Refactor to handle AssetCacheError properly
    
    This file appears to have fallen out of sync and had incorrect assumptions
    about the errors which can be raised from AssetRemote.fetch_directory()
    and AssetCache.push_directory().
    
    This patch also concentrates the `try` blocks around the functions which
    can raise AssetCacheError.
---
 src/buildstream/_sourcecache.py | 60 +++++++++++++++++++----------------------
 1 file changed, 27 insertions(+), 33 deletions(-)

diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index dc706062f..d67d922b0 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -117,27 +117,22 @@ class SourceCache(AssetCache):
         # First fetch the source directory digest so we know what to pull
         source_digest = None
         for remote in index_remotes:
-            try:
-                remote.init()
-                source.status("Pulling source {} <- {}".format(display_key, 
remote))
-
-                source_digest = self._pull_source(ref, remote)
-                if source_digest is None:
-                    source.info(
-                        "Remote source service ({}) does not have source {} 
cached".format(remote, display_key)
-                    )
-                    continue
-            except CASError as e:
-                raise SourceCacheError("Failed to pull source {}: 
{}".format(display_key, e), temporary=True) from e
+            remote.init()
+            source.status("Pulling source {} <- {}".format(display_key, 
remote))
+
+            source_digest = self._pull_source(ref, remote)
+            if source_digest is None:
+                source.info("Remote source service ({}) does not have source 
{} cached".format(remote, display_key))
+                continue
 
         if not source_digest:
             return False
 
         for remote in storage_remotes:
-            try:
-                remote.init()
-                source.status("Pulling data for source {} <- 
{}".format(display_key, remote))
+            remote.init()
+            source.status("Pulling data for source {} <- 
{}".format(display_key, remote))
 
+            try:
                 # Fetch source blobs
                 self.cas._fetch_directory(remote, source_digest)
 
@@ -230,33 +225,32 @@ class SourceCache(AssetCache):
     def _pull_source(self, source_ref, remote):
         uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
 
+        remote.init()
         try:
-            remote.init()
             response = remote.fetch_directory([uri])
-            if not response:
-                return None
+        except AssetCacheError as e:
+            raise SourceCacheError(
+                "Failed to pull source with status {}: 
{}".format(e.code().name, e.details()), temporary=True
+            ) from e
+            return None
+
+        if response:
             self._store_source(source_ref, response.root_directory_digest)
             return response.root_directory_digest
 
-        except grpc.RpcError as e:
-            if e.code() != grpc.StatusCode.NOT_FOUND:
-                raise SourceCacheError(
-                    "Failed to pull source with status {}: 
{}".format(e.code().name, e.details()), temporary=True
-                )
-            return None
+        return None
 
     def _push_source(self, source_ref, remote):
         uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
 
+        remote.init()
+        source_proto = self._get_source(source_ref)
         try:
-            remote.init()
-            source_proto = self._get_source(source_ref)
             remote.push_directory([uri], source_proto.files)
-            return True
-
-        except grpc.RpcError as e:
-            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
-                raise SourceCacheError(
-                    "Failed to push source with status {}: 
{}".format(e.code().name, e.details()), temporary=True
-                )
+        except AssetCacheError as e:
+            raise SourceCacheError(
+                "Failed to push source with status {}: 
{}".format(e.code().name, e.details()), temporary=True
+            ) from e
             return False
+
+        return True

Reply via email to