BjornPrime commented on code in PR #28079:
URL: https://github.com/apache/beam/pull/28079#discussion_r1336010856


##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -654,38 +658,45 @@ def stage_file(
       mime_type='application/octet-stream',
       total_size=None):
     """Stages a file at a GCS or local path with stream-supplied contents."""
+    from google.cloud.exceptions import Forbidden
+    from google.cloud.exceptions import NotFound
     if not gcs_or_local_path.startswith('gs://'):
       local_path = FileSystems.join(gcs_or_local_path, file_name)
       _LOGGER.info('Staging file locally to %s', local_path)
       with open(local_path, 'wb') as f:
         f.write(stream.read())
       return
     gcs_location = FileSystems.join(gcs_or_local_path, file_name)
-    bucket, name = gcs_location[5:].split('/', 1)
-
-    request = storage.StorageObjectsInsertRequest(bucket=bucket, name=name)
+    bucket_name, blob_name = gcs_location[5:].split('/', 1)
     start_time = time.time()
     _LOGGER.info('Starting GCS upload to %s...', gcs_location)
-    upload = storage.Upload(stream, mime_type, total_size)
     try:
-      response = self._storage_client.objects.Insert(request, upload=upload)
-    except exceptions.HttpError as e:
-      reportable_errors = {
-          403: 'access denied',
-          404: 'bucket not found',
-      }
-      if e.status_code in reportable_errors:
+      from google.cloud.storage import Blob
+      from google.cloud.storage.fileio import BlobWriter
+      bucket = self._storage_client.get_bucket(bucket_name)
+      blob = bucket.get_blob(blob_name)
+      if not blob:
+        blob = Blob(blob_name, bucket)
+      with BlobWriter(blob) as f:
+        f.write(stream.read())
+      return
+    except Exception as e:
+      reportable_errors = [
+          Forbidden,
+          NotFound,
+      ]
+      if type(e) in reportable_errors:
         raise IOError((
             'Could not upload to GCS path %s: %s. Please verify '
-            'that credentials are valid and that you have write '
-            'access to the specified path.') %
-                      (gcs_or_local_path, reportable_errors[e.status_code]))
+            'that credentials are valid, that the specified path '
+            'exists, and that you have write access to it.') %
+                      (gcs_or_local_path, e))
       raise
-    _LOGGER.info(
-        'Completed GCS upload to %s in %s seconds.',
-        gcs_location,
-        int(time.time() - start_time))
-    return response
+    finally:

Review Comment:
   Good catch, I'll fix that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to