This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f2f960b359 use batch delete for GCS IO (#34835)
6f2f960b359 is described below

commit 6f2f960b35957a7227286634d96108d6c4eb3003
Author: liferoad <[email protected]>
AuthorDate: Mon May 5 10:59:37 2025 -0400

    use batch delete for GCS IO (#34835)
    
    * use batch delete for GCS IO
    
    * fixed the gcsio test
    
    * fix format
    
    * fixed format
    
    * fixed the lint
---
 sdks/python/apache_beam/io/gcp/gcsio.py      | 21 +++++++++++++----
 sdks/python/apache_beam/io/gcp/gcsio_test.py | 34 ++++++++++++++++++++++++++--
 2 files changed, 49 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 25ec664e009..69d45c229d1 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -265,10 +265,23 @@ class GcsIO(object):
     bucket_name, blob_name = parse_gcs_path(path)
     bucket = self.client.bucket(bucket_name)
     if recursive:
-      # List and delete all blobs under the prefix.
-      blobs = bucket.list_blobs(prefix=blob_name)
-      for blob in blobs:
-        self._delete_blob(bucket, blob.name)
+      # List all blobs under the prefix.
+      blobs_to_delete = bucket.list_blobs(
+          prefix=blob_name, retry=self._storage_client_retry)
+      # Collect full paths for batch deletion.
+      paths_to_delete = [
+          f'gs://{bucket_name}/{blob.name}' for blob in blobs_to_delete
+      ]
+      if paths_to_delete:
+        # Delete them in batches.
+        results = self.delete_batch(paths_to_delete)
+        # Log any errors encountered during batch deletion.
+        errors = [f'{path}: {err}' for path, err in results if err is not None]
+        if errors:
+          _LOGGER.warning(
+              'Failed to delete some objects during recursive delete of %s: 
%s',
+              path,
+              ', '.join(errors))
     else:
       # Delete only the specific blob.
       self._delete_blob(bucket, blob_name)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index d3155130d51..c8ded104b29 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -75,8 +75,38 @@ class FakeGcsClient(object):
     else:
       return self.create_bucket(name)
 
-  def batch(self):
-    pass
+  def batch(self, raise_exception=True):
+    # Return a mock object configured to act as a context manager
+    # and provide the necessary _responses attribute after __exit__.
+    # test_delete performs 3 deletions.
+    num_expected_responses = 3
+    mock_batch = mock.Mock()
+
+    # Configure the mock responses (assuming success for test_delete)
+    # These need to be available *after* the 'with' block finishes.
+    # We'll store them temporarily and assign in __exit__.
+    successful_responses = [
+        mock.Mock(status_code=204) for _ in range(num_expected_responses)
+    ]
+
+    # Define the exit logic
+    def mock_exit_logic(exc_type, exc_val, exc_tb):
+      # Assign responses to the mock instance itself
+      # so they are available after the 'with' block.
+      mock_batch._responses = successful_responses
+
+    # Configure the mock to behave like a context manager
+    mock_batch.configure_mock(
+        __enter__=mock.Mock(return_value=mock_batch),
+        __exit__=mock.Mock(side_effect=mock_exit_logic))
+
+    # The loop inside _batch_with_retry calls fn(request) for each item.
+    # The real batch object might have methods like add() or similar,
+    # but the core logic in gcsio.py calls the passed function `fn` directly
+    # within the `with` block. So, no specific action methods seem needed
+    # on the mock_batch itself for this test case.
+
+    return mock_batch
 
   def add_file(self, bucket, blob, contents):
     folder = self.lookup_bucket(bucket)

Reply via email to