janowskijak commented on code in PR #27471:
URL: https://github.com/apache/beam/pull/27471#discussion_r1280206427


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -657,173 +478,18 @@ def _updated_to_seconds(updated):
         updated.microsecond / 1000000.0)
 
 
-class GcsDownloader(Downloader):
-  def __init__(self, client, path, buffer_size, get_project_number):
-    self._client = client
-    self._path = path
-    self._bucket, self._name = parse_gcs_path(path)
-    self._buffer_size = buffer_size
-    self._get_project_number = get_project_number
-
-    # Create a request count metric
-    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
-    labels = {
-        monitoring_infos.SERVICE_LABEL: 'Storage',
-        monitoring_infos.METHOD_LABEL: 'Objects.get',
-        monitoring_infos.RESOURCE_LABEL: resource,
-        monitoring_infos.GCS_BUCKET_LABEL: self._bucket
-    }
-    project_number = self._get_project_number(self._bucket)
-    if project_number:
-      labels[monitoring_infos.GCS_PROJECT_ID_LABEL] = str(project_number)
-    else:
-      _LOGGER.debug(
-          'Possibly missing storage.buckets.get permission to '
-          'bucket %s. Label %s is not added to the counter because it '
-          'cannot be identified.',
-          self._bucket,
-          monitoring_infos.GCS_PROJECT_ID_LABEL)
-
-    service_call_metric = ServiceCallMetric(
-        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
-        base_labels=labels)
-
-    # Get object state.
-    self._get_request = (
-        storage.StorageObjectsGetRequest(
-            bucket=self._bucket, object=self._name))
-    try:
-      metadata = self._get_object_metadata(self._get_request)
-    except HttpError as http_error:
-      service_call_metric.call(http_error)
-      if http_error.status_code == 404:
-        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
-      else:
-        _LOGGER.error(
-            'HTTP error while requesting file %s: %s', self._path, http_error)
-        raise
-    else:
-      service_call_metric.call('ok')
+class BeamBlobWriter(BlobWriter):
+  def __init__(
+      self, blob, content_type, chunk_size=16 * 1024 * 1024, 
ignore_flush=True):
+    super().__init__(
+        blob,
+        content_type=content_type,
+        chunk_size=chunk_size,
+        ignore_flush=ignore_flush)

Review Comment:
   Potentially yes, however it's not explicitly stated in the BlobWriter 
documentation. 
   
   https://cloud.google.com/storage/docs/retry-strategy#tools
   
https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.fileio.BlobWriter
   
   Regardless of the default behaviour, it would be helpful to allow for 
passing a custom retry policy.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to