Repository: incubator-airflow Updated Branches: refs/heads/master 55f267492 -> f9ddb36df
[AIRFLOW-2033] Add Google Cloud Storage List Operator Added an operator to get object names in a GCS bucket filtered by prefix and delimiter with example. Closes #2974 from kaxil/gcs_list_op Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f9ddb36d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f9ddb36d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f9ddb36d Branch: refs/heads/master Commit: f9ddb36df1676afea22125644c241c8ba65598db Parents: 55f2674 Author: Kaxil Naik <[email protected]> Authored: Fri Jan 26 11:28:14 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jan 26 11:28:14 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/gcs_list_operator.py | 85 ++++++++++++++++++++ .../contrib/operators/test_gcs_list_operator.py | 45 +++++++++++ 2 files changed, 130 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/airflow/contrib/operators/gcs_list_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py new file mode 100644 index 0000000..e991766 --- /dev/null +++ b/airflow/contrib/operators/gcs_list_operator.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageListOperator(BaseOperator): + """ + List all objects from the bucket with the give string prefix and delimiter in name. + + This operator returns a python list with the name of objects which can be used by + `xcom` in the downstream task. + + :param bucket: The Google cloud storage bucket to find the objects. + :type bucket: string + :param prefix: Prefix string which filters objects whose name begin with this prefix + :type prefix: string + :param delimiter: The delimiter by which you want to filter the objects. + For e.g to lists the CSV files from in a directory in GCS you would use + delimiter='.csv'. + :type delimiter: string + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string + + Example: The following Operator would list all the Avro files from `sales/sales-2017` + folder in `data` bucket. + + GCS_Files = GoogleCloudStorageListOperator( + task_id='GCS_Files', + bucket='data', + prefix='sales/sales-2017/', + delimiter='.avro', + google_cloud_storage_conn_id=google_cloud_conn_id + ) + """ + template_fields = ('bucket', 'prefix', 'delimiter') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket, + prefix=None, + delimiter=None, + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + super(GoogleCloudStorageListOperator, self).__init__(*args, **kwargs) + self.bucket = bucket + self.prefix = prefix + self.delimiter = delimiter + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to + ) + + self.log.info('Getting list of the files. Bucket: %s; Delimiter: %s; Prefix: %s', + self.bucket, self.delimiter, self.prefix) + + return hook.list(bucket=self.bucket, + prefix=self.prefix, + delimiter=self.delimiter) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/tests/contrib/operators/test_gcs_list_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_list_operator.py b/tests/contrib/operators/test_gcs_list_operator.py new file mode 100644 index 0000000..cc2c253 --- /dev/null +++ b/tests/contrib/operators/test_gcs_list_operator.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-gcs-list-operator' +TEST_BUCKET = 'test-bucket' +DELIMITER = '.csv' +PREFIX = 'TEST' + + +class GoogleCloudStorageListOperatorTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageHook') + def test_execute(self, mock_hook): + operator = GoogleCloudStorageListOperator(task_id=TASK_ID, + bucket=TEST_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER) + + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + bucket=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER + )
