This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c7f0372cc4 GCSHook: Log NotFound error instead of raise on Blob
deletion (#62424)
5c7f0372cc4 is described below
commit 5c7f0372cc437b520785171ebe9eb244854e08ba
Author: Michael Doo <[email protected]>
AuthorDate: Mon Apr 6 18:32:15 2026 -0400
GCSHook: Log NotFound error instead of raise on Blob deletion (#62424)
* GCSHook: Log NotFound error instead of raise on blob delete
Instead of raising a 404/NotFound error when calling `GCSHook.delete()`,
match the behavior of `delete_bucket` and log the error.
* GCSDeleteObjectsOperator: Add flag to ignore errors on blob deletion
Makes the `GCSHook.delete` `ignore_error` boolean available to
ignore errors when a blob doesn't exist.
---
.../airflow/providers/google/cloud/hooks/gcs.py | 19 ++++++----
.../providers/google/cloud/operators/gcs.py | 5 ++-
.../tests/unit/google/cloud/hooks/test_gcs.py | 44 ++++++++++++++++++----
.../tests/unit/google/cloud/operators/test_gcs.py | 32 ++++++++++++----
4 files changed, 76 insertions(+), 24 deletions(-)
diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
b/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
index 3ec6a23d306..981481fb3b0 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/gcs.py
@@ -705,22 +705,27 @@ class GCSHook(GoogleBaseHook):
return True
return False
- def delete(self, bucket_name: str, object_name: str) -> None:
+ def delete(self, bucket_name: str, object_name: str, ignore_error: bool =
False) -> None:
"""
Delete an object from the bucket.
:param bucket_name: name of the bucket, where the object resides
:param object_name: name of the object to delete
+ :param ignore_error: (Optional) whether to ignore NotFound exceptions.
Default: False
"""
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
- blob.delete()
- get_hook_lineage_collector().add_input_asset(
- context=self, scheme="gs", asset_kwargs={"bucket": bucket.name,
"key": blob.name}
- )
-
- self.log.info("Blob %s deleted.", object_name)
+ try:
+ blob.delete()
+ get_hook_lineage_collector().add_input_asset(
+ context=self, scheme="gs", asset_kwargs={"bucket":
bucket.name, "key": blob.name}
+ )
+ self.log.info("Blob %s deleted.", object_name)
+ except NotFound:
+ self.log.warning("Blob %s in bucket %s does not exist.",
blob.name, bucket.name)
+ if not ignore_error:
+ raise
def get_bucket(self, bucket_name: str) -> storage.Bucket:
"""
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
index bc5b044b60f..c69898bc7f8 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/gcs.py
@@ -280,6 +280,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
of objects in the bucket, not including gs://bucket/
:param prefix: String or list of strings, which filter objects whose name
begin with
it/them. (templated)
+ :param ignore_error: (Optional) whether to ignore NotFound exceptions.
Default: False
:param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
:param impersonation_chain: Optional service account to impersonate using
short-term
credentials, or chained list of accounts required to get the
access_token
@@ -305,6 +306,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
bucket_name: str,
objects: list[str] | None = None,
prefix: str | list[str] | None = None,
+ ignore_error: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
@@ -312,6 +314,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
self.bucket_name = bucket_name
self.objects = objects
self.prefix = prefix
+ self.ignore_error = ignore_error
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
@@ -338,7 +341,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
objects = hook.list(bucket_name=self.bucket_name,
prefix=self.prefix)
self.log.info("Deleting %s objects from %s", len(objects),
self.bucket_name)
for object_name in objects:
- hook.delete(bucket_name=self.bucket_name, object_name=object_name)
+ hook.delete(bucket_name=self.bucket_name, object_name=object_name,
ignore_error=self.ignore_error)
def get_openlineage_facets_on_start(self):
from airflow.providers.common.compat.openlineage.facet import (
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
index 6040115f220..fd5f6b6bf41 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
@@ -528,21 +528,26 @@ class TestGCSHook:
@mock.patch("google.cloud.storage.Bucket")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
- def test_delete(self, mock_service, mock_bucket):
+ def test_delete(self, mock_service, mock_bucket, caplog):
test_bucket = "test_bucket"
test_object = "test_object"
blob_to_be_deleted = storage.Blob(name=test_object, bucket=mock_bucket)
- get_bucket_method = mock_service.return_value.get_bucket
- get_blob_method = get_bucket_method.return_value.get_blob
- delete_method = get_blob_method.return_value.delete
+ bucket_method = mock_service.return_value.bucket
+ blob = bucket_method.return_value.blob
+ delete_method = blob.return_value.delete
delete_method.return_value = blob_to_be_deleted
- response = self.gcs_hook.delete(bucket_name=test_bucket,
object_name=test_object)
- assert response is None
+ with caplog.at_level(logging.INFO):
+ self.gcs_hook.delete(bucket_name=test_bucket,
object_name=test_object)
+
+ bucket_method.assert_called_once_with(test_bucket)
+ blob.assert_called_once_with(blob_name=test_object)
+ delete_method.assert_called_once()
+ assert "Blob test_object deleted" in caplog.text
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
- def test_delete_nonexisting_object(self, mock_service):
+ def test_delete_nonexisting_object(self, mock_service, caplog):
test_bucket = "test_bucket"
test_object = "test_object"
@@ -551,9 +556,32 @@ class TestGCSHook:
delete_method = blob.return_value.delete
delete_method.side_effect = NotFound(message="Not Found")
- with pytest.raises(NotFound):
+ with pytest.raises(NotFound), caplog.at_level(logging.INFO):
self.gcs_hook.delete(bucket_name=test_bucket,
object_name=test_object)
+ bucket_method.assert_called_once_with(test_bucket)
+ blob.assert_called_once_with(blob_name=test_object)
+ delete_method.assert_called_once()
+ assert "does not exist" in caplog.text
+
+ @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+ def test_delete_nonexisting_object_ignore_error(self, mock_service,
caplog):
+ test_bucket = "test_bucket"
+ test_object = "test_object"
+
+ bucket_method = mock_service.return_value.bucket
+ blob = bucket_method.return_value.blob
+ delete_method = blob.return_value.delete
+ delete_method.side_effect = NotFound(message="Not Found")
+
+ with caplog.at_level(logging.INFO):
+ self.gcs_hook.delete(bucket_name=test_bucket,
object_name=test_object, ignore_error=True)
+
+ bucket_method.assert_called_once_with(test_bucket)
+ blob.assert_called_once_with(blob_name=test_object)
+ delete_method.assert_called_once()
+ assert "does not exist" in caplog.text
+
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_delete_exposes_lineage(self, mock_service,
hook_lineage_collector):
test_bucket = "test_bucket"
diff --git a/providers/google/tests/unit/google/cloud/operators/test_gcs.py
b/providers/google/tests/unit/google/cloud/operators/test_gcs.py
index 1ecd31754e3..26281bdc7a8 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_gcs.py
@@ -128,8 +128,24 @@ class TestGCSDeleteObjectsOperator:
mock_hook.return_value.list.assert_not_called()
mock_hook.return_value.delete.assert_has_calls(
calls=[
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0]),
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0],
ignore_error=False),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1],
ignore_error=False),
+ ],
+ any_order=True,
+ )
+
+ @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
+ def test_delete_objects_with_ignore_error(self, mock_hook):
+ operator = GCSDeleteObjectsOperator(
+ task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=MOCK_FILES[0:2],
ignore_error=True
+ )
+
+ operator.execute(None)
+ mock_hook.return_value.list.assert_not_called()
+ mock_hook.return_value.delete.assert_has_calls(
+ calls=[
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0],
ignore_error=True),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1],
ignore_error=True),
],
any_order=True,
)
@@ -151,8 +167,8 @@ class TestGCSDeleteObjectsOperator:
mock_hook.return_value.list.assert_called_once_with(bucket_name=TEST_BUCKET,
prefix=PREFIX)
mock_hook.return_value.delete.assert_has_calls(
calls=[
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2]),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1],
ignore_error=False),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2],
ignore_error=False),
],
any_order=True,
)
@@ -166,10 +182,10 @@ class TestGCSDeleteObjectsOperator:
mock_hook.return_value.list.assert_called_once_with(bucket_name=TEST_BUCKET,
prefix="")
mock_hook.return_value.delete.assert_has_calls(
calls=[
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0]),
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1]),
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2]),
- mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[3]),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[0],
ignore_error=False),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[1],
ignore_error=False),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[2],
ignore_error=False),
+ mock.call(bucket_name=TEST_BUCKET, object_name=MOCK_FILES[3],
ignore_error=False),
],
any_order=True,
)