This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c7f0372cc4 GCSHook: Log NotFound error instead of raise on Blob 
deletion (#62424)
5c7f0372cc4 is described below

commit 5c7f0372cc437b520785171ebe9eb244854e08ba
Author: Michael Doo <[email protected]>
AuthorDate: Mon Apr 6 18:32:15 2026 -0400

    GCSHook: Log NotFound error instead of raise on Blob deletion (#62424)
    
    * GCSHook: Log NotFound error instead of raise on blob delete
    
    Instead of raising a 404/NotFound error when calling `GCSHook.delete()`,
    match the behavior of `delete_bucket` and log the error.
    
    * GCSDeleteObjectsOperator: Add flag to ignore errors on blob deletion
    
    Makes the `GCSHook.delete` `ignore_error` boolean available to
    ignore errors when a blob doesn't exist.
---
 .../airflow/providers/google/cloud/hooks/gcs.py    | 19 ++++++----
 .../providers/google/cloud/operators/gcs.py        |  5 ++-
 .../tests/unit/google/cloud/hooks/test_gcs.py      | 44 ++++++++++++++++++----
 .../tests/unit/google/cloud/operators/test_gcs.py  | 32 ++++++++++++----
 4 files changed, 76 insertions(+), 24 deletions(-)

diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
index 3ec6a23d306..981481fb3b0 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
@@ -705,22 +705,27 @@ class GCSHook(GoogleBaseHook):
                 return True
         return False
 
-    def delete(self, bucket_name: str, object_name: str) -> None:
+    def delete(self, bucket_name: str, object_name: str, ignore_error: bool = 
False) -> None:
         """
         Delete an object from the bucket.
 
         :param bucket_name: name of the bucket, where the object resides
         :param object_name: name of the object to delete
+        :param ignore_error: (Optional) whether to ignore NotFound exceptions. 
Default: False
         """
         client = self.get_conn()
         bucket = client.bucket(bucket_name)
         blob = bucket.blob(blob_name=object_name)
-        blob.delete()
-        get_hook_lineage_collector().add_input_asset(
-            context=self, scheme="gs", asset_kwargs={"bucket": bucket.name, 
"key": blob.name}
-        )
-
-        self.log.info("Blob %s deleted.", object_name)
+        try:
+            blob.delete()
+            get_hook_lineage_collector().add_input_asset(
+                context=self, scheme="gs", asset_kwargs={"bucket": 
bucket.name, "key": blob.name}
+            )
+            self.log.info("Blob %s deleted.", object_name)
+        except NotFound:
+            self.log.warning("Blob %s in bucket %s does not exist.", 
blob.name, bucket.name)
+            if not ignore_error:
+                raise
 
     def get_bucket(self, bucket_name: str) -> storage.Bucket:
         """
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py 
b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
index bc5b044b60f..c69898bc7f8 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
@@ -280,6 +280,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         of objects in the bucket, not including gs://bucket/
     :param prefix: String or list of strings, which filter objects whose name 
begin with
            it/them. (templated)
+    :param ignore_error: (Optional) whether to ignore NotFound exceptions. 
Default: False
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
         credentials, or chained list of accounts required to get the 
access_token
@@ -305,6 +306,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         bucket_name: str,
         objects: list[str] | None = None,
         prefix: str | list[str] | None = None,
+        ignore_error: bool = False,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
@@ -312,6 +314,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         self.bucket_name = bucket_name
         self.objects = objects
         self.prefix = prefix
+        self.ignore_error = ignore_error
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
 
@@ -338,7 +341,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
             objects = hook.list(bucket_name=self.bucket_name, 
prefix=self.prefix)
         self.log.info("Deleting %s objects from %s", len(objects), 
self.bucket_name)
         for object_name in objects:
-            hook.delete(bucket_name=self.bucket_name, object_name=object_name)
+            hook.delete(bucket_name=self.bucket_name, object_name=object_name, 
ignore_error=self.ignore_error)
 
     def get_openlineage_facets_on_start(self):
         from airflow.providers.common.compat.openlineage.facet import (
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py 
b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
index 6040115f220..fd5f6b6bf41 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
@@ -528,21 +528,26 @@ class TestGCSHook:
 
     @mock.patch("google.cloud.storage.Bucket")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
-    def test_delete(self, mock_service, mock_bucket):
+    def test_delete(self, mock_service, mock_bucket, caplog):
         test_bucket = "test_bucket"
         test_object = "test_object"
         blob_to_be_deleted = storage.Blob(name=test_object, bucket=mock_bucket)
 
-        get_bucket_method = mock_service.return_value.get_bucket
-        get_blob_method = get_bucket_method.return_value.get_blob
-        delete_method = get_blob_method.return_value.delete
+        bucket_method = mock_service.return_value.bucket
+        blob = bucket_method.return_value.blob
+        delete_method = blob.return_value.delete
         delete_method.return_value = blob_to_be_deleted
 
-        response = self.gcs_hook.delete(bucket_name=test_bucket, 
object_name=test_object)
-        assert response is None
+        with caplog.at_level(logging.INFO):
+            self.gcs_hook.delete(bucket_name=test_bucket, 
object_name=test_object)
+
+        bucket_method.assert_called_once_with(test_bucket)
+        blob.assert_called_once_with(blob_name=test_object)
+        delete_method.assert_called_once()
+        assert "Blob test_object deleted" in caplog.text
 
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
-    def test_delete_nonexisting_object(self, mock_service):
+    def test_delete_nonexisting_object(self, mock_service, caplog):
         test_bucket = "test_bucket"
         test_object = "test_object"
 
@@ -551,9 +556,32 @@ class TestGCSHook:
         delete_method = blob.return_value.delete
         delete_method.side_effect = NotFound(message="Not Found")
 
-        with pytest.raises(NotFound):
+        with pytest.raises(NotFound), caplog.at_level(logging.INFO):
             self.gcs_hook.delete(bucket_name=test_bucket, 
object_name=test_object)
 
+        bucket_method.assert_called_once_with(test_bucket)
+        blob.assert_called_once_with(blob_name=test_object)
+        delete_method.assert_called_once()
+        assert "does not exist" in caplog.text
+
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    def test_delete_nonexisting_object_ignore_error(self, mock_service, 
caplog):
+        test_bucket = "test_bucket"
+        test_object = "test_object"
+
+        bucket_method = mock_service.return_value.bucket
+        blob = bucket_method.return_value.blob
+        delete_method = blob.return_value.delete
+        delete_method.side_effect = NotFound(message="Not Found")
+
+        with caplog.at_level(logging.INFO):
+            self.gcs_hook.delete(bucket_name=test_bucket, 
object_name=test_object, ignore_error=True)
+
+        bucket_method.assert_called_once_with(test_bucket)
+        blob.assert_called_once_with(blob_name=test_object)
+        delete_method.assert_called_once()
+        assert "does not exist" in caplog.text
+
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_delete_exposes_lineage(self, mock_service, 
hook_lineage_collector):
         test_bucket = "test_bucket"
diff --git a/providers/google/tests/unit/google/cloud/operators/test_gcs.py 
b/providers/google/tests/unit/google/cloud/operators/test_gcs.py
index 1ecd31754e3..26281bdc7a8 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_gcs.py
@@ -128,8 +128,24 @@ class TestGCSDeleteObjectsOperator:
         mock_hook.return_value.list.assert_not_called()
         mock_hook.return_value.delete.assert_has_calls(
             calls=[
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0]),
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0], 
ignore_error=False),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1], 
ignore_error=False),
+            ],
+            any_order=True,
+        )
+
+    @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
+    def test_delete_objects_with_ignore_error(self, mock_hook):
+        operator = GCSDeleteObjectsOperator(
+            task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=MOCK_FILES[0:2], 
ignore_error=True
+        )
+
+        operator.execute(None)
+        mock_hook.return_value.list.assert_not_called()
+        mock_hook.return_value.delete.assert_has_calls(
+            calls=[
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0], 
ignore_error=True),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1], 
ignore_error=True),
             ],
             any_order=True,
         )
@@ -151,8 +167,8 @@ class TestGCSDeleteObjectsOperator:
         
mock_hook.return_value.list.assert_called_once_with(bucket_name=TEST_BUCKET, 
prefix=PREFIX)
         mock_hook.return_value.delete.assert_has_calls(
             calls=[
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2]),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1], 
ignore_error=False),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2], 
ignore_error=False),
             ],
             any_order=True,
         )
@@ -166,10 +182,10 @@ class TestGCSDeleteObjectsOperator:
         
mock_hook.return_value.list.assert_called_once_with(bucket_name=TEST_BUCKET, 
prefix="")
         mock_hook.return_value.delete.assert_has_calls(
             calls=[
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0]),
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2]),
-                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[3]),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0], 
ignore_error=False),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1], 
ignore_error=False),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2], 
ignore_error=False),
+                mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[3], 
ignore_error=False),
             ],
             any_order=True,
         )

Reply via email to