This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23e03db2d7 Add method to get metadata from GCS blob in GCSHook (#38398)
23e03db2d7 is described below

commit 23e03db2d79829f91afe29db757a4e4a26b77874
Author: jalengg <[email protected]>
AuthorDate: Mon May 27 04:48:19 2024 -0500

    Add method to get metadata from GCS blob in GCSHook (#38398)
    
    * Adding get metadata to gcs hook
    
    * unit test
    
    * Spelling and rm fstrings
    
    * test for blob not found
    
    * fix pytest raises, add match regex
---
 airflow/providers/google/cloud/hooks/gcs.py    | 21 +++++++++++++++++++++
 tests/providers/google/cloud/hooks/test_gcs.py | 26 ++++++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/gcs.py 
b/airflow/providers/google/cloud/hooks/gcs.py
index fad5662468..83118e7079 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -1010,6 +1010,27 @@ class GCSHook(GoogleBaseHook):
         self.log.info("The md5Hash of %s is %s", object_name, blob_md5hash)
         return blob_md5hash
 
+    def get_metadata(self, bucket_name: str, object_name: str) -> dict | None:
+        """
+        Get the metadata of an object in Google Cloud Storage.
+
+        :param bucket_name: Name of the Google Cloud Storage bucket where the 
object is.
+        :param object_name: The name of the object containing the desired 
metadata
+        :return: The metadata associated with the object
+        """
+        self.log.info("Retrieving the metadata dict of object (%s) in bucket 
(%s)", object_name, bucket_name)
+        client = self.get_conn()
+        bucket = client.bucket(bucket_name)
+        blob = bucket.get_blob(blob_name=object_name)
+        if blob is None:
+            raise ValueError("Object (%s) not found in bucket (%s)", 
object_name, bucket_name)
+        blob_metadata = blob.metadata
+        if blob_metadata:
+            self.log.info("Retrieved metadata of object (%s) with %s fields", 
object_name, len(blob_metadata))
+        else:
+            self.log.info("Metadata of object (%s) is empty or it does not 
exist", object_name)
+        return blob_metadata
+
     @GoogleBaseHook.fallback_to_default_project_id
     def create_bucket(
         self,
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py 
b/tests/providers/google/cloud/hooks/test_gcs.py
index d9531e3273..5dee8cbf8c 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -565,6 +565,32 @@ class TestGCSHook:
 
         assert response == returned_file_md5hash
 
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    def test_object_get_metadata(self, mock_service):
+        test_bucket = "test_bucket"
+        test_object = "test_object"
+        returned_file_metadata = {"test_metadata_key": "test_metadata_val"}
+
+        bucket_method = mock_service.return_value.bucket
+        get_blob_method = bucket_method.return_value.get_blob
+        get_blob_method.return_value.metadata = returned_file_metadata
+
+        response = self.gcs_hook.get_metadata(bucket_name=test_bucket, 
object_name=test_object)
+
+        assert response == returned_file_metadata
+
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    def test_nonexisting_object_get_metadata(self, mock_service):
+        test_bucket = "test_bucket"
+        test_object = "test_object"
+
+        bucket_method = mock_service.return_value.bucket
+        get_blob_method = bucket_method.return_value.get_blob
+        get_blob_method.return_value = None
+
+        with pytest.raises(ValueError, match=r"Object \((.*?)\) not found in 
bucket \((.*?)\)"):
+            self.gcs_hook.get_metadata(bucket_name=test_bucket, 
object_name=test_object)
+
     @mock.patch("google.cloud.storage.Bucket")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_create_bucket(self, mock_service, mock_bucket):

Reply via email to