Repository: incubator-airflow
Updated Branches:
  refs/heads/master faaf0b8b4 -> 3fe06e9ff


[AIRFLOW-1618] Add feature to create GCS bucket

- Added `create_bucket` method to `gcs_hook` and
created corresponding operator
`GoogleCloudStorageCreateBucket`
- Added tests
- Added documentation

Closes #3044 from kaxil/AIRFLOW-1618


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3fe06e9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3fe06e9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3fe06e9f

Branch: refs/heads/master
Commit: 3fe06e9fff795a5f33035d32098074e6303907ab
Parents: faaf0b8
Author: Kaxil Naik <kaxiln...@gmail.com>
Authored: Mon Feb 19 15:21:04 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Feb 19 15:21:04 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcs_hook.py            |  85 ++++++++++++++++
 airflow/contrib/operators/gcs_operator.py    | 117 ++++++++++++++++++++++
 docs/code.rst                                |   1 +
 docs/integration.rst                         |   8 ++
 tests/contrib/operators/test_gcs_operator.py |  50 +++++++++
 5 files changed, 261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py 
b/airflow/contrib/hooks/gcs_hook.py
index 5312daa..8c1e7bb 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -354,6 +354,91 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             if ex.resp['status'] == '404':
                 raise ValueError('Object Not Found')
 
+    def create_bucket(self,
+                      bucket_name,
+                      storage_class='MULTI_REGIONAL',
+                      location='US',
+                      project_id=None,
+                      labels=None
+                      ):
+        """
+        Creates a new bucket. Google Cloud Storage uses a flat namespace, so
+        you can't create a bucket with a name that is already in use.
+
+        .. seealso::
+            For more information, see Bucket Naming Guidelines:
+            
https://cloud.google.com/storage/docs/bucketnaming.html#requirements
+
+        :param bucket_name: The name of the bucket.
+        :type bucket_name: string
+        :param storage_class: This defines how objects in the bucket are stored
+            and determines the SLA and the cost of storage. Values include
+
+            - ``MULTI_REGIONAL``
+            - ``REGIONAL``
+            - ``STANDARD``
+            - ``NEARLINE``
+            - ``COLDLINE``.
+            If this value is not specified when the bucket is
+            created, it will default to STANDARD.
+        :type storage_class: string
+        :param location: The location of the bucket.
+            Object data for objects in the bucket resides in physical storage
+            within this region. Defaults to US.
+
+            .. seealso::
+                https://developers.google.com/storage/docs/bucket-locations
+
+        :type location: string
+        :param project_id: The ID of the GCP Project.
+        :type project_id: string
+        :param labels: User-provided labels, in key/value pairs.
+        :type labels: dict
+        :return: If successful, it returns the ``id`` of the bucket.
+        """
+
+        project_id = project_id if project_id is not None else self.project_id
+        storage_classes = [
+            'MULTI_REGIONAL',
+            'REGIONAL',
+            'NEARLINE',
+            'COLDLINE',
+            'STANDARD',  # alias for MULTI_REGIONAL/REGIONAL, based on location
+        ]
+
+        self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s',
+                      bucket_name, location, storage_class)
+        assert storage_class in storage_classes, \
+            'Invalid value ({}) passed to storage_class. Value should be ' \
+            'one of {}'.format(storage_class, storage_classes)
+
+        service = self.get_conn()
+        bucket_resource = {
+            'name': bucket_name,
+            'location': location,
+            'storageClass': storage_class
+        }
+
+        self.log.info('The Default Project ID is %s', self.project_id)
+
+        if labels is not None:
+            bucket_resource['labels'] = labels
+
+        try:
+            response = service.buckets().insert(
+                project=project_id,
+                body=bucket_resource
+            ).execute()
+
+            self.log.info('Bucket: %s created successfully.', bucket_name)
+
+            return response['id']
+
+        except errors.HttpError as ex:
+            raise AirflowException(
+                'Bucket creation failed. Error was: {}'.format(ex.content)
+            )
+
 
 def _parse_gcs_url(gsurl):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/airflow/contrib/operators/gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_operator.py 
b/airflow/contrib/operators/gcs_operator.py
new file mode 100644
index 0000000..d92d125
--- /dev/null
+++ b/airflow/contrib/operators/gcs_operator.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.version import version
+
+
+class GoogleCloudStorageCreateBucketOperator(BaseOperator):
+    """
+    Creates a new bucket. Google Cloud Storage uses a flat namespace,
+    so you can't create a bucket with a name that is already in use.
+
+        .. seealso::
+            For more information, see Bucket Naming Guidelines:
+            
https://cloud.google.com/storage/docs/bucketnaming.html#requirements
+
+    :param bucket_name: The name of the bucket.
+    :type bucket_name: string
+    :param storage_class: This defines how objects in the bucket are stored
+            and determines the SLA and the cost of storage. Values include
+
+            - ``MULTI_REGIONAL``
+            - ``REGIONAL``
+            - ``STANDARD``
+            - ``NEARLINE``
+            - ``COLDLINE``.
+            If this value is not specified when the bucket is
+            created, it will default to STANDARD.
+    :type storage_class: string
+    :param location: The location of the bucket.
+        Object data for objects in the bucket resides in physical storage
+        within this region. Defaults to US.
+
+        .. seealso::
+            https://developers.google.com/storage/docs/bucket-locations
+
+    :type location: string
+    :param project_id: The ID of the GCP Project.
+    :type project_id: string
+    :param labels: User-provided labels, in key/value pairs.
+    :type labels: dict
+    :param google_cloud_storage_conn_id: The connection ID to use when
+        connecting to Google cloud storage.
+    :type google_cloud_storage_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must
+        have domain-wide delegation enabled.
+    :type delegate_to: string
+
+    **Example**:
+        The following Operator would create a new bucket ``test-bucket``
+        with ``MULTI_REGIONAL`` storage class in ``EU`` region ::
+
+            CreateBucket = GoogleCloudStorageCreateBucketOperator(
+                task_id='CreateNewBucket',
+                bucket_name='test-bucket',
+                storage_class='MULTI_REGIONAL',
+                location='EU',
+                labels={'env': 'dev', 'team': 'airflow'},
+                google_cloud_storage_conn_id='airflow-service-account'
+            )
+    """
+
+    template_fields = ('bucket_name', 'storage_class',
+                       'location', 'project_id')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket_name,
+                 storage_class='MULTI_REGIONAL',
+                 location='US',
+                 project_id=None,
+                 labels=None,
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        super(GoogleCloudStorageCreateBucketOperator, self).__init__(*args, 
**kwargs)
+        self.bucket_name = bucket_name
+        self.storage_class = storage_class
+        self.location = location
+        self.project_id = project_id
+        self.labels = labels
+
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        if self.labels is not None:
+            self.labels.update(
+                {'airflow-version': 'v' + version.replace('.', 
'-').replace('+', '-')}
+            )
+
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to
+        )
+
+        hook.create_bucket(bucket_name=self.bucket_name,
+                           storage_class=self.storage_class,
+                           location=self.location,
+                           project_id=self.project_id,
+                           labels=self.labels)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 3f3811a..597e35d 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -126,6 +126,7 @@ Community-contributed Operators
 .. autoclass:: 
airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator
 .. autoclass:: 
airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
 .. autoclass:: 
airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator
+.. autoclass:: 
airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
 .. autoclass:: 
airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
 .. autoclass:: 
airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
 .. autoclass:: 
airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 9755136..92f59b6 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -687,6 +687,7 @@ Storage Operators
 
 - :ref:`FileToGoogleCloudStorageOperator` : Uploads a file to Google Cloud 
Storage.
 - :ref:`GoogleCloudStorageCopyOperator` : Copies objects (optionally from a 
directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket 
to another bucket in a different directory, if required.
+- :ref:`GoogleCloudStorageCreateBucketOperator` : Creates a new cloud storage 
bucket.
 - :ref:`GoogleCloudStorageListOperator` : List all objects from the bucket 
with the give string prefix and delimiter in name.
 - :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google 
Cloud Storage.
 - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud 
storage into BigQuery.
@@ -706,6 +707,13 @@ GoogleCloudStorageCopyOperator
 
 .. autoclass:: 
airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator
 
+.. _GoogleCloudStorageCreateBucketOperator:
+
+GoogleCloudStorageCreateBucketOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
+
 .. _GoogleCloudStorageDownloadOperator:
 
 GoogleCloudStorageDownloadOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3fe06e9f/tests/contrib/operators/test_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_operator.py 
b/tests/contrib/operators/test_gcs_operator.py
new file mode 100644
index 0000000..fed0ee0
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_operator.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_operator import 
GoogleCloudStorageCreateBucketOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-gcs-operator'
+TEST_BUCKET = 'test-bucket'
+TEST_PROJECT = 'test-project'
+
+
+class GoogleCloudStorageCreateBucketTest(unittest.TestCase):
+
+    
@mock.patch('airflow.contrib.operators.gcs_operator.GoogleCloudStorageHook')
+    def test_execute(self, mock_hook):
+        operator = GoogleCloudStorageCreateBucketOperator(
+            task_id=TASK_ID,
+            bucket_name=TEST_BUCKET,
+            storage_class='MULTI_REGIONAL',
+            location='EU',
+            labels={'env': 'prod'},
+            project_id=TEST_PROJECT
+        )
+
+        operator.execute(None)
+        mock_hook.return_value.create_bucket.assert_called_once_with(
+            bucket_name=TEST_BUCKET, storage_class='MULTI_REGIONAL',
+            location='EU', labels={'airflow-version': 'v1-10-0dev0-incubating',
+                                   'env': 'prod'}, project_id=TEST_PROJECT
+        )

Reply via email to