[
https://issues.apache.org/jira/browse/AIRFLOW-2932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613144#comment-16613144
]
ASF GitHub Bot commented on AIRFLOW-2932:
-----------------------------------------
neil90 closed pull request #3870: [AIRFLOW-2932] GoogleCloudStorageHook - allow
compression of file
URL: https://github.com/apache/incubator-airflow/pull/3870
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/gcs_hook.py
b/airflow/contrib/hooks/gcs_hook.py
index 6cfa1cf565..1663cae1ba 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -24,7 +24,10 @@
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.exceptions import AirflowException
+import gzip as gz
+import shutil
import re
+import os
class GoogleCloudStorageHook(GoogleCloudBaseHook):
@@ -172,7 +175,8 @@ def download(self, bucket, object, filename=None):
return downloaded_file_bytes
# pylint:disable=redefined-builtin
- def upload(self, bucket, object, filename,
mime_type='application/octet-stream'):
+ def upload(self, bucket, object, filename,
+ gzip=False, mime_type='application/octet-stream'):
"""
Uploads a local file to Google Cloud Storage.
@@ -182,16 +186,32 @@ def upload(self, bucket, object, filename,
mime_type='application/octet-stream')
:type object: str
:param filename: The local file path to the file to be uploaded.
:type filename: str
+ :param gzip: Option to compress file for upload
+ :type gzip: boolean
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: str
"""
service = self.get_conn()
+
+ if gzip:
+ filename_gz = filename + '.gz'
+
+ with open(filename, 'rb') as f_in:
+ with gz.open(filename_gz, 'wb') as f_out:
+ shutil.copyfileobj(f_in, f_out)
+ filename = filename_gz
+
media = MediaFileUpload(filename, mime_type)
+
try:
service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()
+
+ # Clean up gzip file
+ if gzip:
+ os.remove(filename)
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
diff --git a/airflow/contrib/operators/file_to_gcs.py
b/airflow/contrib/operators/file_to_gcs.py
index a392a16891..5b85ff7b48 100644
--- a/airflow/contrib/operators/file_to_gcs.py
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -37,6 +37,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
:type google_cloud_storage_conn_id: str
:param mime_type: The mime-type string
:type mime_type: str
+ :type gzip: Allows for file to upload as gzip
+ :param gzip: boolean
:param delegate_to: The account to impersonate, if any
:type delegate_to: str
"""
@@ -49,6 +51,7 @@ def __init__(self,
bucket,
google_cloud_storage_conn_id='google_cloud_default',
mime_type='application/octet-stream',
+ gzip=False,
delegate_to=None,
*args,
**kwargs):
@@ -58,6 +61,7 @@ def __init__(self,
self.bucket = bucket
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.mime_type = mime_type
+ self.gzip = gzip
self.delegate_to = delegate_to
def execute(self, context):
@@ -72,4 +76,5 @@ def execute(self, context):
bucket=self.bucket,
object=self.dst,
mime_type=self.mime_type,
+ gzip=self.gzip,
filename=self.src)
diff --git a/dags/test_gcs_dag.py b/dags/test_gcs_dag.py
new file mode 100644
index 0000000000..e41314af27
--- /dev/null
+++ b/dags/test_gcs_dag.py
@@ -0,0 +1,36 @@
+"""
+Code that goes along with the Airflow tutorial located at:
+https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
+"""
+from airflow import DAG
+from airflow.contrib.operators.file_to_gcs import
FileToGoogleCloudStorageOperator
+from datetime import datetime, timedelta
+
+
+default_args = {
+ 'owner': 'airflow',
+ 'depends_on_past': False,
+ 'start_date': datetime(2015, 6, 1),
+ 'email': ['[email protected]'],
+ 'email_on_failure': False,
+ 'email_on_retry': False,
+ 'retries': 1,
+ 'retry_delay': timedelta(minutes=5),
+ # 'queue': 'bash_queue',
+ # 'pool': 'backfill',
+ # 'priority_weight': 10,
+ # 'end_date': datetime(2016, 1, 1),
+}
+
+dag = DAG('gcs_upload_gzip', default_args=default_args)
+
+t1 = FileToGoogleCloudStorageOperator(task_id='gcs_upload',src='/app/2008.csv',
+dst='2008.csv1', bucket='gcsnotes-neil', gzip=False, dag=dag)
+
+#export AIRFLOW_HOME=/airflow
+#airflow initdb
+#airflow webserver -p 8080
+#pip install -e ".[hdfs,hive,druid,devel,gcs]"
+#airflow test gcs_upload_gzip gcs_upload 2015-01-01
+#https://www.googleapis.com/auth/cloud-platform
+#pip install --upgrade google-api-python-client
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> GoogleCloudStorageHook - allow compression of file
> --------------------------------------------------
>
> Key: AIRFLOW-2932
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2932
> Project: Apache Airflow
> Issue Type: Improvement
> Affects Versions: 1.10.0
> Reporter: jack
> Assignee: neil90
> Priority: Major
>
> The *upload*{color:#555555}({color}_bucket_{color:#555555},
> {color}_object_{color:#555555}, {color}_filename_{color:#555555},
> {color}_mime_type='application/octet-stream'_{color:#555555}){color} function
> allows to upload file from local disk.
> The google cloud support GZIP and BigQuery can read GZIP files. So, most
> people upload compressed files in order to save space.
> It would be nice if the upload function would be able to make the compression
> on it's own (if asked by the user). This will save the trouble of having to
> compress the file by ourselves.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)