tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237725603
##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
"""
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsDeleteRequest(
- bucket=bucket, object=object_path)
+ bucket_name, target_name = parse_gcs_path(path)
try:
- self.client.objects.Delete(request)
- except HttpError as http_error:
- if http_error.status_code == 404:
- # Return success when the file doesn't exist anymore for idempotency.
- return
- raise
-
- # We intentionally do not decorate this method with a retry, as retrying is
- # handled in BatchApiRequest.Execute().
+ bucket = self.client.get_bucket(bucket_name)
+ bucket.delete_blob(target_name)
+ except NotFound:
+ return
+
def delete_batch(self, paths):
"""Deletes the objects at the given GCS paths.
Args:
paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
- Returns: List of tuples of (path, exception) in the same order as the paths
- argument, where exception is None if the operation succeeded or
- the relevant exception if the operation failed.
"""
- if not paths:
- return []
-
- paths = iter(paths)
- result_statuses = []
- while True:
- paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
- if not paths_chunk:
- return result_statuses
- batch_request = BatchApiRequest(
- batch_url=GCS_BATCH_ENDPOINT,
- retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
- response_encoding='utf-8')
- for path in paths_chunk:
- bucket, object_path = parse_gcs_path(path)
- request = storage.StorageObjectsDeleteRequest(
- bucket=bucket, object=object_path)
- batch_request.Add(self.client.objects, 'Delete', request)
- api_calls = batch_request.Execute(self.client._http) # pylint:
disable=protected-access
- for i, api_call in enumerate(api_calls):
- path = paths_chunk[i]
- exception = None
- if api_call.is_error:
- exception = api_call.exception
- # Return success when the file doesn't exist anymore for idempotency.
- if isinstance(exception, HttpError) and exception.status_code == 404:
- exception = None
- result_statuses.append((path, exception))
- return result_statuses
+ start = 0
+ while start < len(paths):
+ if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+ current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+ else:
+ current_paths = paths[start:]
+ try:
+ with self.client.batch():
+ for path in current_paths:
+ bucket_name, blob_path = parse_gcs_path(path)
+ bucket = self.client.get_bucket(bucket_name)
+ blob = storage.Blob(blob_path, bucket)
+ blob.delete()
+ except NotFound:
+ pass
+ start += MAX_BATCH_OPERATION_SIZE
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def copy(
- self,
- src,
- dest,
- dest_kms_key_name=None,
- max_bytes_rewritten_per_call=None):
+ def copy(self, src, dest):
"""Copies the given GCS object from src to dest.
Args:
src: GCS file path pattern in the form gs://<bucket>/<name>.
dest: GCS file path pattern in the form gs://<bucket>/<name>.
- dest_kms_key_name: Experimental. No backwards compatibility guarantees.
- Encrypt dest with this Cloud KMS key. If None, will use dest bucket
- encryption defaults.
- max_bytes_rewritten_per_call: Experimental. No backwards compatibility
- guarantees. Each rewrite API call will return after these many bytes.
- Used for testing.
Raises:
TimeoutError: on timeout.
"""
- src_bucket, src_path = parse_gcs_path(src)
- dest_bucket, dest_path = parse_gcs_path(dest)
- request = storage.StorageObjectsRewriteRequest(
- sourceBucket=src_bucket,
- sourceObject=src_path,
- destinationBucket=dest_bucket,
- destinationObject=dest_path,
- destinationKmsKeyName=dest_kms_key_name,
- maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
- response = self.client.objects.Rewrite(request)
- while not response.done:
- _LOGGER.debug(
- 'Rewrite progress: %d of %d bytes, %s to %s',
- response.totalBytesRewritten,
- response.objectSize,
- src,
- dest)
- request.rewriteToken = response.rewriteToken
- response = self.client.objects.Rewrite(request)
- if self._rewrite_cb is not None:
- self._rewrite_cb(response)
-
- _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
- # We intentionally do not decorate this method with a retry, as retrying is
- # handled in BatchApiRequest.Execute().
- def copy_batch(
- self,
- src_dest_pairs,
- dest_kms_key_name=None,
- max_bytes_rewritten_per_call=None):
+ src_bucket_name, src_path = parse_gcs_path(src)
Review Comment:
src_path / dest_path as a confiusing name. let's use src_object_name or
src_blob_name. would be good to use the same name for consistency throughout
this file.
Note that in GCS there is no concept of path or subdirectories. there are
buckets and objects. forward-slashes are simply characters in object names.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]