BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1194341172


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -296,160 +234,87 @@ def delete_batch(self, paths):
     """
     if not paths:
       return []
-
-    paths = iter(paths)
+    if len(paths) > MAX_BATCH_OPERATION_SIZE:
+      raise TooManyRequests("Batch larger than %s", MAX_BATCH_OPERATION_SIZE)
     result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
+    with self.client.batch():
+      for path in paths:
+        bucket_name, blob_path = parse_gcs_path(path)
+        bucket = self.client.get_bucket(bucket_name)
+        blob = storage.Blob(blob_path, bucket)
         exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
+        try:
+          blob.delete()
+        except Exception as err:
+          if err is NotFound:
             exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+          else:
+            exception = err
+        finally:
+          result_statuses.append((path, exception))
+
+      return result_statuses
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,

Review Comment:
   The kms key for a bucket is applied to the objects within it by default, so 
when a blob is copied over, the new blob inherits the kms key of the 
destination bucket unless explicitly told to do otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to