BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1242651594


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)

Review Comment:
   I've updated it to do so. No reason to respecify the blob name if it isn't 
changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to