kaxil closed pull request #4286: [AIRFLOW-3310] Google Cloud Spanner deploy / 
delete operators
URL: https://github.com/apache/incubator-airflow/pull/4286
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_spanner.py 
b/airflow/contrib/example_dags/example_gcp_spanner.py
new file mode 100644
index 0000000000..dd8b8c52b9
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_spanner.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that creates, updates and deletes a Cloud Spanner instance.
+
+This DAG relies on the following environment variables
+* PROJECT_ID - Google Cloud Platform project for the Cloud Spanner instance.
+* INSTANCE_ID - Cloud Spanner instance ID.
+* CONFIG_NAME - The name of the instance's configuration. Values are of the 
form
+    projects/<project>/instanceConfigs/<configuration>.
+    See also:
+        
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig
+        
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs
+* NODE_COUNT - Number of nodes allocated to the instance.
+* DISPLAY_NAME - The descriptive name for this instance as it appears in UIs.
+    Must be unique per project and between 4 and 30 characters in length.
+"""
+
+import os
+
+import airflow
+from airflow import models
+from airflow.contrib.operators.gcp_spanner_operator import \
+    CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator
+
+# [START howto_operator_spanner_arguments]
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+INSTANCE_ID = os.environ.get('INSTANCE_ID', 'testinstance')
+CONFIG_NAME = os.environ.get('CONFIG_NAME',
+                             'projects/example-project/instanceConfigs/eur3')
+NODE_COUNT = os.environ.get('NODE_COUNT', '1')
+DISPLAY_NAME = os.environ.get('DISPLAY_NAME', 'Test Instance')
+# [END howto_operator_spanner_arguments]
+
+default_args = {
+    'start_date': airflow.utils.dates.days_ago(1)
+}
+
+with models.DAG(
+    'example_gcp_spanner',
+    default_args=default_args,
+    schedule_interval=None  # Override to match your needs
+) as dag:
+    # Create
+    # [START howto_operator_spanner_deploy]
+    spanner_instance_create_task = CloudSpannerInstanceDeployOperator(
+        project_id=PROJECT_ID,
+        instance_id=INSTANCE_ID,
+        configuration_name=CONFIG_NAME,
+        node_count=int(NODE_COUNT),
+        display_name=DISPLAY_NAME,
+        task_id='spanner_instance_create_task'
+    )
+    # [END howto_operator_spanner_deploy]
+
+    # Update
+    spanner_instance_update_task = CloudSpannerInstanceDeployOperator(
+        project_id=PROJECT_ID,
+        instance_id=INSTANCE_ID,
+        configuration_name=CONFIG_NAME,
+        node_count=int(NODE_COUNT) + 1,
+        display_name=DISPLAY_NAME + '_updated',
+        task_id='spanner_instance_update_task'
+    )
+
+    # [START howto_operator_spanner_delete]
+    spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator(
+        project_id=PROJECT_ID,
+        instance_id=INSTANCE_ID,
+        task_id='spanner_instance_delete_task'
+    )
+    # [END howto_operator_spanner_delete]
+
+    spanner_instance_create_task >> spanner_instance_update_task \
+        >> spanner_instance_delete_task
diff --git a/airflow/contrib/hooks/gcp_spanner_hook.py 
b/airflow/contrib/hooks/gcp_spanner_hook.py
new file mode 100644
index 0000000000..fc73562e8b
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_spanner_hook.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from google.longrunning.operations_grpc_pb2 import Operation  # noqa: F401
+from typing import Optional, Callable  # noqa: F401
+
+from google.api_core.exceptions import GoogleAPICallError
+from google.cloud.spanner_v1.client import Client
+from google.cloud.spanner_v1.instance import Instance  # noqa: F401
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+# noinspection PyAbstractClass
+class CloudSpannerHook(GoogleCloudBaseHook):
+    """
+    Hook for Google Cloud Spanner APIs.
+    """
+    _client = None
+
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(CloudSpannerHook, self).__init__(gcp_conn_id, delegate_to)
+
+    def get_client(self, project_id):
+        # type: (str) -> Client
+        """
+        Provides a client for interacting with Cloud Spanner API.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and data.
+        :type project_id: str
+        :return: Client for interacting with Cloud Spanner API. See:
+            
https://googleapis.github.io/google-cloud-python/latest/spanner/client-api.html#google.cloud.spanner_v1.client.Client
+        :rtype: object
+        """
+        if not self._client:
+            self._client = Client(project=project_id, 
credentials=self._get_credentials())
+        return self._client
+
+    def get_instance(self, project_id, instance_id):
+        # type: (str, str) -> Optional[Instance]
+        """
+        Gets information about a particular instance.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :return: Representation of a Cloud Spanner Instance. See:
+            
https://googleapis.github.io/google-cloud-python/latest/spanner/instance-api.html#google.cloud.spanner_v1.instance.Instance
+        :rtype: object
+        """
+        client = self.get_client(project_id)
+        instance = client.instance(instance_id)
+        if not instance.exists():
+            return None
+        return instance
+
+    def create_instance(self, project_id, instance_id, configuration_name, 
node_count,
+                        display_name):
+        # type: (str, str, str, int, str) -> bool
+        """
+        Creates a new Cloud Spanner instance.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining 
how the
+            instance will be created. Required for instances which do not yet 
exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the 
instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in 
the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value 
is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :return: True if the operation succeeded, raises an exception 
otherwise.
+        :rtype: bool
+        """
+        return self._apply_to_instance(project_id, instance_id, 
configuration_name,
+                                       node_count, display_name, lambda x: 
x.create())
+
+    def update_instance(self, project_id, instance_id, configuration_name, 
node_count,
+                        display_name):
+        # type: (str, str, str, int, str) -> bool
+        """
+        Updates an existing Cloud Spanner instance.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining 
how the
+            instance will be created. Required for instances which do not yet 
exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the 
instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in 
the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value 
is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :return: True if the operation succeeded, raises an exception 
otherwise.
+        :rtype: bool
+        """
+        return self._apply_to_instance(project_id, instance_id, 
configuration_name,
+                                       node_count, display_name, lambda x: 
x.update())
+
+    def _apply_to_instance(self, project_id, instance_id, configuration_name, 
node_count,
+                           display_name, func):
+        # type: (str, str, str, int, str, Callable[[Instance], Operation]) -> 
bool
+        """
+        Invokes a method on a given instance by applying a specified Callable.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and
+            data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        :param configuration_name: Name of the instance configuration defining 
how the
+            instance will be created. Required for instances which do not yet 
exist.
+        :type configuration_name: str
+        :param node_count: (Optional) Number of nodes allocated to the 
instance.
+        :type node_count: int
+        :param display_name: (Optional) The display name for the instance in 
the Cloud
+            Console UI. (Must be between 4 and 30 characters.) If this value 
is not set
+            in the constructor, will fall back to the instance ID.
+        :type display_name: str
+        :param func: Method of the instance to be called.
+        :type func: Callable
+        """
+        client = self.get_client(project_id)
+        instance = client.instance(instance_id,
+                                   configuration_name=configuration_name,
+                                   node_count=node_count,
+                                   display_name=display_name)
+        try:
+            operation = func(instance)  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Aborting.', e.message)
+            raise e
+
+        if operation:
+            result = operation.result()
+            self.log.info(result)
+        return True
+
+    def delete_instance(self, project_id, instance_id):
+        # type: (str, str) -> bool
+        """
+        Deletes an existing Cloud Spanner instance.
+
+        :param project_id: The ID of the project which owns the instances, 
tables and data.
+        :type project_id: str
+        :param instance_id: The ID of the instance.
+        :type instance_id: str
+        """
+        client = self.get_client(project_id)
+        instance = client.instance(instance_id)
+        try:
+            instance.delete()
+            return True
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Aborting.', e.message)
+            raise e
diff --git a/airflow/contrib/operators/gcp_spanner_operator.py 
b/airflow/contrib/operators/gcp_spanner_operator.py
new file mode 100644
index 0000000000..7b329a3849
--- /dev/null
+++ b/airflow/contrib/operators/gcp_spanner_operator.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_spanner_hook import CloudSpannerHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class CloudSpannerInstanceDeployOperator(BaseOperator):
+    """
+    Creates a new Cloud Spanner instance or, if an instance with the same 
instance_id
+    exists in the specified project, updates it.
+
+    :param project_id: The ID of the project which owns the instances, tables 
and data.
+    :type project_id: str
+    :param instance_id: Cloud Spanner instance ID.
+    :type instance_id: str
+    :param configuration_name: Name of the instance configuration defining
+        how the instance will be created. Required for instances which do not 
yet exist.
+    :type configuration_name: str
+    :param node_count: (Optional) Number of nodes allocated to the instance.
+    :type node_count: int
+    :param display_name: (Optional) The display name for the instance in the
+        Cloud Console UI. (Must be between 4 and 30 characters.) If this value 
is not
+        set in the constructor, will fall back to the instance ID.
+    :type display_name: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_deploy_template_fields]
+    template_fields = ('project_id', 'instance_id', 'configuration_name', 
'display_name',
+                       'gcp_conn_id')
+    # [END gcp_spanner_deploy_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 configuration_name,
+                 node_count,
+                 display_name,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.configuration_name = configuration_name
+        self.node_count = node_count
+        self.display_name = display_name
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDeployOperator, self).__init__(*args, 
**kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is 
empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is 
empty")
+
+    def execute(self, context):
+        if not self._hook.get_instance(self.project_id, self.instance_id):
+            self.log.info("Creating Cloud Spanner instance '%s'", 
self.instance_id)
+            func = self._hook.create_instance
+        else:
+            self.log.info("Updating Cloud Spanner instance '%s'", 
self.instance_id)
+            func = self._hook.update_instance
+        return func(self.project_id,
+                    self.instance_id,
+                    self.configuration_name,
+                    self.node_count,
+                    self.display_name)
+
+
+class CloudSpannerInstanceDeleteOperator(BaseOperator):
+    """
+    Deletes a Cloud Spanner instance.
+    If an instance does not exist, no action will be taken and the operator 
will succeed.
+
+    :param project_id: The ID of the project which owns the instances, tables 
and data.
+    :type project_id: str
+    :param instance_id: Cloud Spanner instance ID.
+    :type instance_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    # [START gcp_spanner_delete_template_fields]
+    template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
+    # [END gcp_spanner_delete_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance_id,
+                 gcp_conn_id='google_cloud_default',
+                 *args, **kwargs):
+        self.instance_id = instance_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self._validate_inputs()
+        self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id)
+        super(CloudSpannerInstanceDeleteOperator, self).__init__(*args, 
**kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is 
empty")
+        if not self.instance_id:
+            raise AirflowException("The required parameter 'instance_id' is 
empty")
+
+    def execute(self, context):
+        if self._hook.get_instance(self.project_id, self.instance_id):
+            return self._hook.delete_instance(self.project_id,
+                                              self.instance_id)
+        else:
+            self.log.info("Instance '%s' does not exist in project '%s'. "
+                          "Aborting delete.", self.instance_id, 
self.project_id)
+            return True
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index cd6e73b1f2..095553b3ac 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -545,6 +545,94 @@ See `Google Cloud Functions API documentation
 Google Cloud Sql Operators
 --------------------------
 
+CloudSpannerInstanceDeployOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Creates a new Cloud Spanner instance or, if an instance with the same name 
exists,
+updates it.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :start-after: [START howto_operator_spanner_arguments]
+    :end-before: [END howto_operator_spanner_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_spanner_deploy]
+    :end-before: [END howto_operator_spanner_deploy]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_spanner_deploy_template_fields]
+  :end-before: [END gcp_spanner_deploy_template_fields]
+
+More information
+""""""""""""""""
+
+See Google Cloud Spanner API documentation for instance `create
+<https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.CreateInstance>`_
+and `update
+<https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.admin.instance.v1#google.spanner.admin.instance.v1.InstanceAdmin.UpdateInstance>`_.
+
+CloudSpannerInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Deletes a Cloud Spanner instance.
+If an instance does not exist, no action will be taken and the operator will 
succeed.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :start-after: [START howto_operator_spanner_arguments]
+    :end-before: [END howto_operator_spanner_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_spanner_delete]
+    :end-before: [END howto_operator_spanner_delete]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_spanner_delete_template_fields]
+  :end-before: [END gcp_spanner_delete_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud Spanner API documentation for instance delete
+<https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances/delete>`_.
+
 CloudSqlInstanceDatabaseCreateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/docs/integration.rst b/docs/integration.rst
index 8fbc4be764..d3fa1e9e7b 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -543,6 +543,30 @@ BigQueryHook
 .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook
     :members:
 
+Cloud Spanner
+'''''''''''''
+
+Cloud Spanner Operators
+"""""""""""""""""""""""
+
+- :ref:`CloudSpannerInstanceDeployOperator` : creates a new Cloud Spanner 
instance or,
+  if an instance with the same name exists, updates it.
+- :ref:`CloudSpannerInstanceDeleteOperator` : deletes a Cloud Spanner instance.
+
+.. _CloudSpannerInstanceDeployOperator:
+
+CloudSpannerInstanceDeployOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator
+
+.. _CloudSpannerInstanceDeleteOperator:
+
+CloudSpannerInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator
+
 Cloud SQL
 '''''''''
 
diff --git a/setup.py b/setup.py
index 9170ce7f6c..be755d5307 100644
--- a/setup.py
+++ b/setup.py
@@ -188,6 +188,8 @@ def write_version(filename=os.path.join(*['airflow',
     'google-auth>=1.0.0, <2.0.0dev',
     'google-auth-httplib2>=0.0.1',
     'google-cloud-container>=0.1.1',
+    'google-cloud-spanner>=1.6.0',
+    'grpcio-gcp>=0.2.2',
     'PyOpenSSL',
     'pandas-gbq'
 ]
diff --git a/tests/contrib/operators/test_gcp_spanner_operator.py 
b/tests/contrib/operators/test_gcp_spanner_operator.py
new file mode 100644
index 0000000000..ff2b82fd16
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_spanner_operator.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from parameterized import parameterized
+
+from airflow import AirflowException
+from airflow.contrib.operators.gcp_spanner_operator import \
+    CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+    SKIP_TEST_WARNING, GCP_SPANNER_KEY
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+PROJECT_ID = 'project-id'
+INSTANCE_ID = 'instance-id'
+DB_NAME = 'db1'
+CONFIG_NAME = 'projects/project-id/instanceConfigs/eur3'
+NODE_COUNT = '1'
+DISPLAY_NAME = 'Test Instance'
+
+
+class CloudSpannerTest(unittest.TestCase):
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_create(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+        op = CloudSpannerInstanceDeployOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            configuration_name=CONFIG_NAME,
+            node_count=int(NODE_COUNT),
+            display_name=DISPLAY_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_ID, CONFIG_NAME, int(NODE_COUNT), DISPLAY_NAME
+        )
+        mock_hook.return_value.update_instance.assert_not_called()
+        self.assertTrue(result)
+
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_update(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = {"name": 
INSTANCE_ID}
+        op = CloudSpannerInstanceDeployOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            configuration_name=CONFIG_NAME,
+            node_count=int(NODE_COUNT),
+            display_name=DISPLAY_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.update_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_ID, CONFIG_NAME, int(NODE_COUNT), DISPLAY_NAME
+        )
+        mock_hook.return_value.create_instance.assert_not_called()
+        self.assertTrue(result)
+
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_create_aborts_and_succeeds_if_instance_exists(self, 
mock_hook):
+        mock_hook.return_value.get_instance.return_value = {"name": 
INSTANCE_ID}
+        op = CloudSpannerInstanceDeployOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            configuration_name=CONFIG_NAME,
+            node_count=int(NODE_COUNT),
+            display_name=DISPLAY_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_instance.assert_not_called()
+        self.assertTrue(result)
+
+    @parameterized.expand([
+        ("", INSTANCE_ID, "project_id"),
+        (PROJECT_ID, "", "instance_id"),
+    ])
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_create_ex_if_param_missing(self, project_id, instance_id,
+                                                 exp_msg, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            CloudSpannerInstanceDeployOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                configuration_name=CONFIG_NAME,
+                node_count=int(NODE_COUNT),
+                display_name=DISPLAY_NAME,
+                task_id="id"
+            )
+        err = cm.exception
+        self.assertIn("The required parameter '{}' is empty".format(exp_msg), 
str(err))
+        mock_hook.assert_not_called()
+
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_delete(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = {"name": 
INSTANCE_ID}
+        op = CloudSpannerInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_ID
+        )
+        self.assertTrue(result)
+
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def 
test_instance_delete_aborts_and_succeeds_if_instance_does_not_exist(self,
+                                                                            
mock_hook):
+        mock_hook.return_value.get_instance.return_value = None
+        op = CloudSpannerInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance_id=INSTANCE_ID,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_instance.assert_not_called()
+        self.assertTrue(result)
+
+    @parameterized.expand([
+        ("", INSTANCE_ID, "project_id"),
+        (PROJECT_ID, "", "instance_id"),
+    ])
+    
@mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook")
+    def test_instance_delete_ex_if_param_missing(self, project_id, 
instance_id, exp_msg,
+                                                 mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            CloudSpannerInstanceDeleteOperator(
+                project_id=project_id,
+                instance_id=instance_id,
+                task_id="id"
+            )
+        err = cm.exception
+        self.assertIn("The required parameter '{}' is empty".format(exp_msg), 
str(err))
+        mock_hook.assert_not_called()
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING)
+class CloudSpannerExampleDagsTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSpannerExampleDagsTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_spanner',
+            gcp_key=GCP_SPANNER_KEY)
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to