Repository: incubator-airflow Updated Branches: refs/heads/master faaf0b8b4 -> 3fe06e9ff
[AIRFLOW-1618] Add feature to create GCS bucket - Added `create_bucket` method to `gcs_hook` and created corresponding operator `GoogleCloudStorageCreateBucket` - Added tests - Added documentation Closes #3044 from kaxil/AIRFLOW-1618 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3fe06e9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3fe06e9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3fe06e9f Branch: refs/heads/master Commit: 3fe06e9fff795a5f33035d32098074e6303907ab Parents: faaf0b8 Author: Kaxil Naik <kaxiln...@gmail.com> Authored: Mon Feb 19 15:21:04 2018 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Mon Feb 19 15:21:04 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/gcs_hook.py | 85 ++++++++++++++++ airflow/contrib/operators/gcs_operator.py | 117 ++++++++++++++++++++++ docs/code.rst | 1 + docs/integration.rst | 8 ++ tests/contrib/operators/test_gcs_operator.py | 50 +++++++++ 5 files changed, 261 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 5312daa..8c1e7bb 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -354,6 +354,91 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): if ex.resp['status'] == '404': raise ValueError('Object Not Found') + def create_bucket(self, + bucket_name, + storage_class='MULTI_REGIONAL', + location='US', + project_id=None, + labels=None + ): + """ + Creates a new bucket. Google Cloud Storage uses a flat namespace, so + you can't create a bucket with a name that is already in use. + + .. seealso:: + For more information, see Bucket Naming Guidelines: + https://cloud.google.com/storage/docs/bucketnaming.html#requirements + + :param bucket_name: The name of the bucket. + :type bucket_name: string + :param storage_class: This defines how objects in the bucket are stored + and determines the SLA and the cost of storage. Values include + + - ``MULTI_REGIONAL`` + - ``REGIONAL`` + - ``STANDARD`` + - ``NEARLINE`` + - ``COLDLINE``. + If this value is not specified when the bucket is + created, it will default to STANDARD. + :type storage_class: string + :param location: The location of the bucket. + Object data for objects in the bucket resides in physical storage + within this region. Defaults to US. + + .. seealso:: + https://developers.google.com/storage/docs/bucket-locations + + :type location: string + :param project_id: The ID of the GCP Project. + :type project_id: string + :param labels: User-provided labels, in key/value pairs. + :type labels: dict + :return: If successful, it returns the ``id`` of the bucket. + """ + + project_id = project_id if project_id is not None else self.project_id + storage_classes = [ + 'MULTI_REGIONAL', + 'REGIONAL', + 'NEARLINE', + 'COLDLINE', + 'STANDARD', # alias for MULTI_REGIONAL/REGIONAL, based on location + ] + + self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s', + bucket_name, location, storage_class) + assert storage_class in storage_classes, \ + 'Invalid value ({}) passed to storage_class. Value should be ' \ + 'one of {}'.format(storage_class, storage_classes) + + service = self.get_conn() + bucket_resource = { + 'name': bucket_name, + 'location': location, + 'storageClass': storage_class + } + + self.log.info('The Default Project ID is %s', self.project_id) + + if labels is not None: + bucket_resource['labels'] = labels + + try: + response = service.buckets().insert( + project=project_id, + body=bucket_resource + ).execute() + + self.log.info('Bucket: %s created successfully.', bucket_name) + + return response['id'] + + except errors.HttpError as ex: + raise AirflowException( + 'Bucket creation failed. Error was: {}'.format(ex.content) + ) + def _parse_gcs_url(gsurl): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/airflow/contrib/operators/gcs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_operator.py b/airflow/contrib/operators/gcs_operator.py new file mode 100644 index 0000000..d92d125 --- /dev/null +++ b/airflow/contrib/operators/gcs_operator.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.version import version + + +class GoogleCloudStorageCreateBucketOperator(BaseOperator): + """ + Creates a new bucket. Google Cloud Storage uses a flat namespace, + so you can't create a bucket with a name that is already in use. + + .. seealso:: + For more information, see Bucket Naming Guidelines: + https://cloud.google.com/storage/docs/bucketnaming.html#requirements + + :param bucket_name: The name of the bucket. + :type bucket_name: string + :param storage_class: This defines how objects in the bucket are stored + and determines the SLA and the cost of storage. Values include + + - ``MULTI_REGIONAL`` + - ``REGIONAL`` + - ``STANDARD`` + - ``NEARLINE`` + - ``COLDLINE``. + If this value is not specified when the bucket is + created, it will default to STANDARD. + :type storage_class: string + :param location: The location of the bucket. + Object data for objects in the bucket resides in physical storage + within this region. Defaults to US. + + .. seealso:: + https://developers.google.com/storage/docs/bucket-locations + + :type location: string + :param project_id: The ID of the GCP Project. + :type project_id: string + :param labels: User-provided labels, in key/value pairs. + :type labels: dict + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google cloud storage. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must + have domain-wide delegation enabled. + :type delegate_to: string + + **Example**: + The following Operator would create a new bucket ``test-bucket`` + with ``MULTI_REGIONAL`` storage class in ``EU`` region :: + + CreateBucket = GoogleCloudStorageCreateBucketOperator( + task_id='CreateNewBucket', + bucket_name='test-bucket', + storage_class='MULTI_REGIONAL', + location='EU', + labels={'env': 'dev', 'team': 'airflow'}, + google_cloud_storage_conn_id='airflow-service-account' + ) + """ + + template_fields = ('bucket_name', 'storage_class', + 'location', 'project_id') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket_name, + storage_class='MULTI_REGIONAL', + location='US', + project_id=None, + labels=None, + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + *args, + **kwargs): + super(GoogleCloudStorageCreateBucketOperator, self).__init__(*args, **kwargs) + self.bucket_name = bucket_name + self.storage_class = storage_class + self.location = location + self.project_id = project_id + self.labels = labels + + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + if self.labels is not None: + self.labels.update( + {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')} + ) + + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to + ) + + hook.create_bucket(bucket_name=self.bucket_name, + storage_class=self.storage_class, + location=self.location, + project_id=self.project_id, + labels=self.labels) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 3f3811a..597e35d 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -126,6 +126,7 @@ Community-contributed Operators .. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator .. autoclass:: airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator +.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator .. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 9755136..92f59b6 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -687,6 +687,7 @@ Storage Operators - :ref:`FileToGoogleCloudStorageOperator` : Uploads a file to Google Cloud Storage. - :ref:`GoogleCloudStorageCopyOperator` : Copies objects (optionally from a directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket to another bucket in a different directory, if required. +- :ref:`GoogleCloudStorageCreateBucketOperator` : Creates a new cloud storage bucket. - :ref:`GoogleCloudStorageListOperator` : List all objects from the bucket with the give string prefix and delimiter in name. - :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google Cloud Storage. - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. @@ -706,6 +707,13 @@ GoogleCloudStorageCopyOperator .. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator +.. _GoogleCloudStorageCreateBucketOperator: + +GoogleCloudStorageCreateBucketOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator + .. _GoogleCloudStorageDownloadOperator: GoogleCloudStorageDownloadOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/tests/contrib/operators/test_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_operator.py b/tests/contrib/operators/test_gcs_operator.py new file mode 100644 index 0000000..fed0ee0 --- /dev/null +++ b/tests/contrib/operators/test_gcs_operator.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.contrib.operators.gcs_operator import GoogleCloudStorageCreateBucketOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-gcs-operator' +TEST_BUCKET = 'test-bucket' +TEST_PROJECT = 'test-project' + + +class GoogleCloudStorageCreateBucketTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.gcs_operator.GoogleCloudStorageHook') + def test_execute(self, mock_hook): + operator = GoogleCloudStorageCreateBucketOperator( + task_id=TASK_ID, + bucket_name=TEST_BUCKET, + storage_class='MULTI_REGIONAL', + location='EU', + labels={'env': 'prod'}, + project_id=TEST_PROJECT + ) + + operator.execute(None) + mock_hook.return_value.create_bucket.assert_called_once_with( + bucket_name=TEST_BUCKET, storage_class='MULTI_REGIONAL', + location='EU', labels={'airflow-version': 'v1-10-0dev0-incubating', + 'env': 'prod'}, project_id=TEST_PROJECT + )