janowskijak commented on code in PR #27471:
URL: https://github.com/apache/beam/pull/27471#discussion_r1277191991


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -657,173 +478,18 @@ def _updated_to_seconds(updated):
         updated.microsecond / 1000000.0)
 
 
-class GcsDownloader(Downloader):
-  def __init__(self, client, path, buffer_size, get_project_number):
-    self._client = client
-    self._path = path
-    self._bucket, self._name = parse_gcs_path(path)
-    self._buffer_size = buffer_size
-    self._get_project_number = get_project_number
-
-    # Create a request count metric
-    resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
-    labels = {
-        monitoring_infos.SERVICE_LABEL: 'Storage',
-        monitoring_infos.METHOD_LABEL: 'Objects.get',
-        monitoring_infos.RESOURCE_LABEL: resource,
-        monitoring_infos.GCS_BUCKET_LABEL: self._bucket
-    }
-    project_number = self._get_project_number(self._bucket)
-    if project_number:
-      labels[monitoring_infos.GCS_PROJECT_ID_LABEL] = str(project_number)
-    else:
-      _LOGGER.debug(
-          'Possibly missing storage.buckets.get permission to '
-          'bucket %s. Label %s is not added to the counter because it '
-          'cannot be identified.',
-          self._bucket,
-          monitoring_infos.GCS_PROJECT_ID_LABEL)
-
-    service_call_metric = ServiceCallMetric(
-        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
-        base_labels=labels)
-
-    # Get object state.
-    self._get_request = (
-        storage.StorageObjectsGetRequest(
-            bucket=self._bucket, object=self._name))
-    try:
-      metadata = self._get_object_metadata(self._get_request)
-    except HttpError as http_error:
-      service_call_metric.call(http_error)
-      if http_error.status_code == 404:
-        raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
-      else:
-        _LOGGER.error(
-            'HTTP error while requesting file %s: %s', self._path, http_error)
-        raise
-    else:
-      service_call_metric.call('ok')
+class BeamBlobWriter(BlobWriter):
+  def __init__(
+      self, blob, content_type, chunk_size=16 * 1024 * 1024, 
ignore_flush=True):
+    super().__init__(
+        blob,
+        content_type=content_type,
+        chunk_size=chunk_size,
+        ignore_flush=ignore_flush)

Review Comment:
   Why not allow setting the `retry` argument of `BlobWriter`? This would allow 
retry in case of hitting GCS write limit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to