This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2f960b359 use batch delete for GCS IO (#34835)
6f2f960b359 is described below
commit 6f2f960b35957a7227286634d96108d6c4eb3003
Author: liferoad <[email protected]>
AuthorDate: Mon May 5 10:59:37 2025 -0400
use batch delete for GCS IO (#34835)
* use batch delete for GCS IO
* fixed the gcsio test
* fix format
* fixed format
* fixed the lint
---
sdks/python/apache_beam/io/gcp/gcsio.py | 21 +++++++++++++----
sdks/python/apache_beam/io/gcp/gcsio_test.py | 34 ++++++++++++++++++++++++++--
2 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 25ec664e009..69d45c229d1 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -265,10 +265,23 @@ class GcsIO(object):
bucket_name, blob_name = parse_gcs_path(path)
bucket = self.client.bucket(bucket_name)
if recursive:
- # List and delete all blobs under the prefix.
- blobs = bucket.list_blobs(prefix=blob_name)
- for blob in blobs:
- self._delete_blob(bucket, blob.name)
+ # List all blobs under the prefix.
+ blobs_to_delete = bucket.list_blobs(
+ prefix=blob_name, retry=self._storage_client_retry)
+ # Collect full paths for batch deletion.
+ paths_to_delete = [
+ f'gs://{bucket_name}/{blob.name}' for blob in blobs_to_delete
+ ]
+ if paths_to_delete:
+ # Delete them in batches.
+ results = self.delete_batch(paths_to_delete)
+ # Log any errors encountered during batch deletion.
+ errors = [f'{path}: {err}' for path, err in results if err is not None]
+ if errors:
+ _LOGGER.warning(
+ 'Failed to delete some objects during recursive delete of %s:
%s',
+ path,
+ ', '.join(errors))
else:
# Delete only the specific blob.
self._delete_blob(bucket, blob_name)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index d3155130d51..c8ded104b29 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -75,8 +75,38 @@ class FakeGcsClient(object):
else:
return self.create_bucket(name)
- def batch(self):
- pass
+ def batch(self, raise_exception=True):
+ # Return a mock object configured to act as a context manager
+ # and provide the necessary _responses attribute after __exit__.
+ # test_delete performs 3 deletions.
+ num_expected_responses = 3
+ mock_batch = mock.Mock()
+
+ # Configure the mock responses (assuming success for test_delete)
+ # These need to be available *after* the 'with' block finishes.
+ # We'll store them temporarily and assign in __exit__.
+ successful_responses = [
+ mock.Mock(status_code=204) for _ in range(num_expected_responses)
+ ]
+
+ # Define the exit logic
+ def mock_exit_logic(exc_type, exc_val, exc_tb):
+ # Assign responses to the mock instance itself
+ # so they are available after the 'with' block.
+ mock_batch._responses = successful_responses
+
+ # Configure the mock to behave like a context manager
+ mock_batch.configure_mock(
+ __enter__=mock.Mock(return_value=mock_batch),
+ __exit__=mock.Mock(side_effect=mock_exit_logic))
+
+ # The loop inside _batch_with_retry calls fn(request) for each item.
+ # The real batch object might have methods like add() or similar,
+ # but the core logic in gcsio.py calls the passed function `fn` directly
+ # within the `with` block. So, no specific action methods seem needed
+ # on the mock_batch itself for this test case.
+
+ return mock_batch
def add_file(self, bucket, blob, contents):
folder = self.lookup_bucket(bucket)