mik-laj commented on a change in pull request #5770: [AIRFLOW-5162] GCS Hook 
Upload Method Improvement
URL: https://github.com/apache/airflow/pull/5770#discussion_r324422459
 
 

 ##########
 File path: airflow/gcp/hooks/gcs.py
 ##########
 @@ -185,45 +180,72 @@ def download(self, bucket_name, object_name, 
filename=None):
         else:
             return blob.download_as_string()
 
-    def upload(self, bucket_name, object_name, filename,
-               mime_type='application/octet-stream', gzip=False):
+    def upload(self, bucket_name: str, object_name: str, filename: str = None,
+               data: Union[str, bytes] = None, mime_type: str = None, gzip: 
bool = False,
+               encoding: str = 'utf-8') -> None:
         """
-        Uploads a local file to Google Cloud Storage.
-
+        Uploads a local file or file data as string or bytes to Google Cloud 
Storage.
         :param bucket_name: The bucket to upload to.
         :type bucket_name: str
-        :param object_name: The object name to set when uploading the local 
file.
+        :param object_name: The object name to set when uploading the file.
         :type object_name: str
         :param filename: The local file path to the file to be uploaded.
         :type filename: str
-        :param mime_type: The MIME type to set when uploading the file.
+        :param data: The file's data as a string or bytes to be uploaded.
+        :type data: str
+        :param mime_type: The file's mime type set when uploading the file.
         :type mime_type: str
-        :param gzip: Option to compress file for upload
+        :param gzip: Option to compress local file or file data for upload
         :type gzip: bool
+        :param encoding: bytes encoding for file data if provided as string
+        :type encoding: str
+        :return: none
+        :rtype: None
+        :raises ValueError: if filename and data param are both provided or 
missing
         """
-
-        if gzip:
-            filename_gz = filename + '.gz'
-
-            with open(filename, 'rb') as f_in:
-                with gz.open(filename_gz, 'wb') as f_out:
-                    shutil.copyfileobj(f_in, f_out)
-                    filename = filename_gz
-
         client = self.get_conn()
         bucket = client.bucket(bucket_name)
         blob = bucket.blob(blob_name=object_name)
-        blob.upload_from_filename(filename=filename,
-                                  content_type=mime_type)
-
-        if gzip:
-            os.remove(filename)
-        self.log.info('File %s uploaded to %s in %s bucket', filename, 
object_name, bucket_name)
+        if filename and data:
+            raise ValueError("'filename' and 'data' parameter provided. Please 
"
+                             "specify a single parameter, either 'filename' 
for "
+                             "local file uploads or 'data' for file content 
uploads.")
+        elif filename:
+            if not mime_type:
+                mime_type = 'application/octet-stream'
+            if gzip:
+                filename_gz = filename + '.gz'
+
+                with open(filename, 'rb') as f_in:
+                    with gz.open(filename_gz, 'wb') as f_out:
+                        shutil.copyfileobj(f_in, f_out)
+                        filename = filename_gz
+
+            blob.upload_from_filename(filename=filename,
+                                      content_type=mime_type)
+            if gzip:
+                os.remove(filename)
+            self.log.info('File %s uploaded to %s in %s bucket', filename, 
object_name, bucket_name)
+        elif data:
+            if not mime_type:
+                mime_type = 'text/plain'
+            if gzip:
+                if isinstance(data, str):
+                    data = bytes(data, encoding)
+                out = BytesIO()
+                with gz.GzipFile(fileobj=out, mode="w") as f:
+                    f.write(data)
+                data = out.getvalue()
+            blob.upload_from_string(data,
+                                    content_type=mime_type)
+            self.log.info('Data stream uploaded to %s in %s bucket', 
object_name, bucket_name)
+        else:
+            raise ValueError("'filename' and 'data' parameter missing. "
+                             "One is required to upload to gcs.")
 
     def exists(self, bucket_name, object_name):
         """
         Checks for the existence of a file in Google Cloud Storage.
 
 Review comment:
   This change prevents the documentation from being built properly. Can you 
delete it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to