tvalentyn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1237673893


##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -340,8 +329,7 @@ def metadata(self, path):
     """
     try:
       file_metadata = self._gcsIO()._status(path)
-      return FileMetadata(
-          path, file_metadata['size'], file_metadata['last_updated'])
+      return FileMetadata(path, file_metadata['size'], 
file_metadata['updated'])

Review Comment:
   did the content of metadata change after swapping clients or it's still the 
same info?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)

Review Comment:
   do we need to specify  `parse_gcs_path(dest, object_optional=True)` ?



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -59,20 +48,10 @@
 
 _LOGGER = logging.getLogger(__name__)
 
-# Issue a friendlier error message if the storage library is not available.
-# TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
-  # pylint: disable=wrong-import-order, wrong-import-position
-  # pylint: disable=ungrouped-imports
-  from apitools.base.py.batch import BatchApiRequest
-  from apitools.base.py.exceptions import HttpError
-  from apitools.base.py import transfer
   from apache_beam.internal.gcp import auth
-  from apache_beam.io.gcp.internal.clients import storage
 except ImportError:
-  raise ImportError(
-      'Google Cloud Storage I/O not supported for this execution environment '
-      '(could not import storage API client).')
+  raise ImportError('Internal auth library not found')

Review Comment:
   This is not necessary. This import shouldn't be throwing exceptions, see: 
https://github.com/apache/beam/blob/88383b940434ec1d97efab3cf9b5d2c9fd54a511/sdks/python/apache_beam/internal/gcp/auth.py#L37



##########
sdks/python/apache_beam/io/gcp/bigquery_test.py:
##########
@@ -796,6 +796,7 @@ def noop(table, **kwargs):
           exception_type=exceptions.ServiceUnavailable if exceptions else None,
           error_message='backendError')
   ])
+  @unittest.skip('Not compatible with new GCS client. See GH issue #26334.')

Review Comment:
   left a comment on the issue. seems like we should find a way to run the 
necessary test scenario or delete the test if the coverage it provides is not 
meaningful. 



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)

Review Comment:
   src_path / dest_path as a confiusing name. let's use src_object_name or 
src_blob_name. would be good to use the same name for consistency throughout 
this file.
   
   Note that in GCS there is not concept of path or subdirectories. there are 
buckets and objects. forward-slashes are simply characters in object names.
   



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -185,40 +144,30 @@ def get_project_number(self, bucket):
       bucket_metadata = self.get_bucket(bucket_name=bucket)
       if bucket_metadata:
         self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber
-      #  else failed to load the bucket metadata due to HttpError
 
     return self.bucket_to_project_number.get(bucket, None)
 
-  def _set_rewrite_response_callback(self, callback):
-    """For testing purposes only. No backward compatibility guarantees.
-
-    Args:
-      callback: A function that receives ``storage.RewriteResponse``.
-    """
-    self._rewrite_cb = callback
-
   def get_bucket(self, bucket_name):
     """Returns an object bucket from its name, or None if it does not exist."""
     try:
-      request = storage.StorageBucketsGetRequest(bucket=bucket_name)
-      return self.client.buckets.Get(request)
-    except HttpError:
+      return self.client.lookup_bucket(bucket_name)
+    except NotFound:
       return None
 
   def create_bucket(self, bucket_name, project, kms_key=None, location=None):
     """Create and return a GCS bucket in a specific project."""
-    encryption = None
-    if kms_key:
-      encryption = storage.Bucket.EncryptionValue(kms_key)
-
-    request = storage.StorageBucketsInsertRequest(
-        bucket=storage.Bucket(
-            name=bucket_name, location=location, encryption=encryption),
-        project=project,
-    )
+
     try:
-      return self.client.buckets.Insert(request)
-    except HttpError:
+      bucket = self.client.create_bucket(
+          bucket_or_name=bucket_name,
+          project=project,
+          location=location,
+      )
+      if kms_key:
+        bucket.default_kms_key_name(kms_key)
+        return self.get_bucket(bucket_name)

Review Comment:
   fwiw, seeing bucket.patch() in 
https://cloud.google.com/storage/docs/samples/storage-set-bucket-default-kms-key
 in this scenario, but not an expert here. wondering if both options are the 
same.



##########
sdks/python/apache_beam/runners/portability/sdk_container_builder.py:
##########
@@ -307,16 +306,16 @@ def _invoke_docker_build_and_push(self, 
container_image_name):
         "Python SDK container built and pushed as %s." % container_image_name)
 
   def _upload_to_gcs(self, local_file_path, gcs_location):
-    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
-    request = storage.StorageObjectsInsertRequest(
-        bucket=gcs_bucket, name=gcs_object)
+    bucket_name, blob_name = self._get_gcs_bucket_and_name(gcs_location)
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    total_size = os.path.getsize(local_file_path)
     from apitools.base.py import exceptions
+    from google.cloud import storage

Review Comment:
   should this code use FileSystems api? 
   
   Also, have you tested prebuilding codepath after these changes? You can run 
Dataflow Python ValidatesContainer postcommit suite.



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -270,186 +213,86 @@ def delete(self, path):
     Args:
       path: GCS file path pattern in the form gs://<bucket>/<name>.
     """
-    bucket, object_path = parse_gcs_path(path)
-    request = storage.StorageObjectsDeleteRequest(
-        bucket=bucket, object=object_path)
+    bucket_name, target_name = parse_gcs_path(path)
     try:
-      self.client.objects.Delete(request)
-    except HttpError as http_error:
-      if http_error.status_code == 404:
-        # Return success when the file doesn't exist anymore for idempotency.
-        return
-      raise
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
+      bucket = self.client.get_bucket(bucket_name)
+      bucket.delete_blob(target_name)
+    except NotFound:
+      return
+
   def delete_batch(self, paths):
     """Deletes the objects at the given GCS paths.
 
     Args:
       paths: List of GCS file path patterns in the form gs://<bucket>/<name>,
              not to exceed MAX_BATCH_OPERATION_SIZE in length.
-
-    Returns: List of tuples of (path, exception) in the same order as the paths
-             argument, where exception is None if the operation succeeded or
-             the relevant exception if the operation failed.
     """
-    if not paths:
-      return []
-
-    paths = iter(paths)
-    result_statuses = []
-    while True:
-      paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))
-      if not paths_chunk:
-        return result_statuses
-      batch_request = BatchApiRequest(
-          batch_url=GCS_BATCH_ENDPOINT,
-          retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES,
-          response_encoding='utf-8')
-      for path in paths_chunk:
-        bucket, object_path = parse_gcs_path(path)
-        request = storage.StorageObjectsDeleteRequest(
-            bucket=bucket, object=object_path)
-        batch_request.Add(self.client.objects, 'Delete', request)
-      api_calls = batch_request.Execute(self.client._http)  # pylint: 
disable=protected-access
-      for i, api_call in enumerate(api_calls):
-        path = paths_chunk[i]
-        exception = None
-        if api_call.is_error:
-          exception = api_call.exception
-          # Return success when the file doesn't exist anymore for idempotency.
-          if isinstance(exception, HttpError) and exception.status_code == 404:
-            exception = None
-        result_statuses.append((path, exception))
-    return result_statuses
+    start = 0
+    while start < len(paths):
+      if (start + MAX_BATCH_OPERATION_SIZE) < len(paths):
+        current_paths = paths[start:start + MAX_BATCH_OPERATION_SIZE]
+      else:
+        current_paths = paths[start:]
+      try:
+        with self.client.batch():
+          for path in current_paths:
+            bucket_name, blob_path = parse_gcs_path(path)
+            bucket = self.client.get_bucket(bucket_name)
+            blob = storage.Blob(blob_path, bucket)
+            blob.delete()
+      except NotFound:
+        pass
+      start += MAX_BATCH_OPERATION_SIZE
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(
-      self,
-      src,
-      dest,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+  def copy(self, src, dest):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
-      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
-        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
-        encryption defaults.
-      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
-        guarantees. Each rewrite API call will return after these many bytes.
-        Used for testing.
 
     Raises:
       TimeoutError: on timeout.
     """
-    src_bucket, src_path = parse_gcs_path(src)
-    dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsRewriteRequest(
-        sourceBucket=src_bucket,
-        sourceObject=src_path,
-        destinationBucket=dest_bucket,
-        destinationObject=dest_path,
-        destinationKmsKeyName=dest_kms_key_name,
-        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
-    response = self.client.objects.Rewrite(request)
-    while not response.done:
-      _LOGGER.debug(
-          'Rewrite progress: %d of %d bytes, %s to %s',
-          response.totalBytesRewritten,
-          response.objectSize,
-          src,
-          dest)
-      request.rewriteToken = response.rewriteToken
-      response = self.client.objects.Rewrite(request)
-      if self._rewrite_cb is not None:
-        self._rewrite_cb(response)
-
-    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
-
-  # We intentionally do not decorate this method with a retry, as retrying is
-  # handled in BatchApiRequest.Execute().
-  def copy_batch(
-      self,
-      src_dest_pairs,
-      dest_kms_key_name=None,
-      max_bytes_rewritten_per_call=None):
+    src_bucket_name, src_path = parse_gcs_path(src)
+    dest_bucket_name, dest_path = parse_gcs_path(dest)
+    src_bucket = self.get_bucket(src_bucket_name)
+    src_blob = src_bucket.get_blob(src_path)
+    if not src_blob:
+      raise NotFound("Source %s not found", src)
+    dest_bucket = self.get_bucket(dest_bucket_name)
+    if not dest_path:
+      dest_path = None
+    src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_path)
+
+  def copy_batch(self, src_dest_pairs):
     """Copies the given GCS object from src to dest.

Review Comment:
   > Copies the given GCS object from src to dest.
   
   should this be:     Copies given GCS objects from src to dest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to