Repository: incubator-airflow Updated Branches: refs/heads/master bd9127ebb -> 518e0073a
[AIRFLOW-274] Add XCom functionality to GoogleCloudStorageDownloadOperator Updated GoogleCloudStorageDownloadOperator so that it has the option to store the downloaded file's contents in an XCom instead of saving to disk. It now also takes filename as an optional argument instead of a required argument, so it is not necessary to save to disk if you do not need to. Closes #1618 from illop/risk_analytics Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/518e0073 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/518e0073 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/518e0073 Branch: refs/heads/master Commit: 518e0073a81585b39757afb4c0d8d65d400a88e2 Parents: bd9127e Author: Ilya Rakoshes <il...@wepay.com> Authored: Thu Jun 23 20:30:11 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Thu Jun 23 20:30:11 2016 -0700 ---------------------------------------------------------------------- .../contrib/operators/gcs_download_operator.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518e0073/airflow/contrib/operators/gcs_download_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py index 7e19e13..511248e 100644 --- a/airflow/contrib/operators/gcs_download_operator.py +++ b/airflow/contrib/operators/gcs_download_operator.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import sys from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.models import BaseOperator @@ -31,7 +32,8 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): self, bucket, object, - filename, + filename=False, + store_to_xcom_key=False, google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, *args, @@ -46,7 +48,13 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): :type object: string :param filename: The file path on the local file system (where the operator is being executed) that the file should be downloaded to. + If false, the downloaded data will not be stored on the local file + system. :type filename: string + :param store_to_xcom_key: If this param is set, the operator will push + the contents of the downloaded file to XCom with the key set in this + paramater. If false, the downloaded data will not be pushed to XCom. + :type store_to_xcom_key: string :param google_cloud_storage_conn_id: The connection ID to use when connecting to Google cloud storage. :type google_cloud_storage_conn_id: string @@ -58,6 +66,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): self.bucket = bucket self.object = object self.filename = filename + self.store_to_xcom_key = store_to_xcom_key self.google_cloud_storage_conn_id = google_cloud_storage_conn_id self.delegate_to = delegate_to @@ -65,4 +74,10 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): logging.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename) hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) - print(hook.download(self.bucket, self.object, self.filename)) + file_bytes = hook.download(self.bucket, self.object, self.filename) + if self.store_to_xcom_key: + if sys.getsizeof(file_bytes) < 48000: + context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes) + else: + raise RuntimeError('The size of the downloaded file is too large to push to XCom!') + print(file_bytes)