Repository: incubator-airflow Updated Branches: refs/heads/master b9c82c040 -> 3e321790d
[AIRFLOW-1855][AIRFLOW-1866] Add GCS Copy Operator to copy multiple files Closes #2819 from kaxil/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e321790 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e321790 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e321790 Branch: refs/heads/master Commit: 3e321790d537696b8fd1a97dcac2ab7c469fecea Parents: b9c82c0 Author: Kaxil Naik <[email protected]> Authored: Fri Dec 1 15:51:22 2017 -0800 Committer: Chris Riccomini <[email protected]> Committed: Fri Dec 1 15:51:30 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/gcs_hook.py | 34 +++--- airflow/contrib/operators/gcs_copy_operator.py | 115 ++++++++++++++++++++ 2 files changed, 135 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 546e028..f6ad39f 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -38,7 +38,6 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): http_authorized = self._authorize() return build('storage', 'v1', http=http_authorized) - # pylint:disable=redefined-builtin def copy(self, source_bucket, source_object, destination_bucket=None, destination_object=None): @@ -48,10 +47,10 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both. - :param bucket: The bucket of the object to copy from. - :type bucket: string - :param object: The object to copy. - :type object: string + :param source_bucket: The bucket of the object to copy from. + :type source_bucket: string + :param source_object: The object to copy. + :type source_object: string :param destination_bucket: The destination of the object to copied to. Can be omitted; then the same bucket is used. :type destination_bucket: string @@ -219,7 +218,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): return False raise - def list(self, bucket, versions=None, maxResults=None, prefix=None): + def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None): """ List all objects from the bucket with the give string prefix in name @@ -231,6 +230,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): :type maxResults: integer :param prefix: prefix string which filters objects whose name begin with this prefix :type prefix: string + :param delimiter: filters objects based on the delimiter (for e.g '.csv') + :type delimiter:string :return: a stream of object names matching the filtering criteria """ service = self.get_conn() @@ -243,16 +244,21 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): versions=versions, maxResults=maxResults, pageToken=pageToken, - prefix=prefix + prefix=prefix, + delimiter=delimiter ).execute() - if 'items' not in response: - self.log.info("No items found for prefix: %s", prefix) - break - - for item in response['items']: - if item and 'name' in item: - ids.append(item['name']) + if 'prefixes' not in response: + if 'items' not in response: + self.log.info("No items found for prefix: %s", prefix) + break + + for item in response['items']: + if item and 'name' in item: + ids.append(item['name']) + else: + for item in response['prefixes']: + ids.append(item) if 'nextPageToken' not in response: # no further pages of results, so stop the loop http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/operators/gcs_copy_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_copy_operator.py b/airflow/contrib/operators/gcs_copy_operator.py new file mode 100644 index 0000000..55d98a3 --- /dev/null +++ b/airflow/contrib/operators/gcs_copy_operator.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageCopyOperator(BaseOperator): + """ + Copies objects (optionally from a directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket + to another bucket in a different directory, if required. + + :param source_bucket: The source Google cloud storage bucket where the object is. + :type source_bucket: string + :param source_object: The source name of the object to copy in the Google cloud + storage bucket. + :type source_object: string + :param source_files_delimiter: The delimiter by which you want to filter the files to copy. + For e.g to copy the CSV files from in a directory in GCS you would use source_files_delimiter='.csv'. + :type source_files_delimiter: string + :param destination_bucket: The destination Google cloud storage bucket where the object should be. + :type destination_bucket: string + :param destination_directory: The destination name of the directory in the destination Google cloud + storage bucket. + :type destination_directory: string + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide delegation enabled. + :type delegate_to: string + + Example: The following Operator would move all the CSV files from `sales/sales-2017` folder in `data` bucket to + `sales` folder in `archive` bucket. + + move_file = GoogleCloudStorageCopyOperator( + task_id='move_file', + source_bucket='data', + source_object='sales/sales-2017/', + source_files_delimiter='.csv' + destination_bucket='archive', + destination_directory='sales', + google_cloud_storage_conn_id='airflow-service-account' + ) + """ + template_fields = ('source_bucket', 'source_object', 'source_files_delimiter', + 'destination_bucket', 'destination_directory') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + source_bucket, + source_object, + source_files_delimiter=None, + destination_bucket=None, + destination_directory='', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + super(GoogleCloudStorageCopyOperator, self).__init__(*args, **kwargs) + self.source_bucket = source_bucket + self.source_object = source_object + self.source_files_delimiter = source_files_delimiter + self.files_to_copy = list() + self.destination_bucket = destination_bucket + self.destination_directory = destination_directory + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + + self.log.info('Executing copy - Source_Bucket: %s, Source_directory: %s, ' + 'Destination_bucket: %s, Destination_directory: %s', + self.source_bucket, self.source_object, + self.destination_bucket or self.source_bucket, + self.destination_directory or self.source_object) + + hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + + self.log.info('Getting list of the files to copy. Source Bucket: %s; Source Object: %s', + self.source_bucket, self.source_object) + + # Create a list of objects to copy from Source bucket. The function uses prefix keyword to pass the name of + # the object to copy. + self.files_to_copy = hook.list(bucket=self.source_bucket, prefix=self.source_object, + delimiter=self.source_files_delimiter) + + # Log the names of all objects to be copied + self.log.info('Files to copy: %s', self.files_to_copy) + + if self.files_to_copy is not None: + for file_to_copy in self.files_to_copy: + self.log.info('Source_Bucket: %s, Source_Object: %s, ' + 'Destination_bucket: %s, Destination_Directory: %s', + self.source_bucket, file_to_copy, + self.destination_bucket or self.source_bucket, + self.destination_directory + file_to_copy) + hook.copy(self.source_bucket, file_to_copy, + self.destination_bucket, self.destination_directory + file_to_copy) + else: + self.log.info('No Files to copy.')
