kaxil commented on a change in pull request #4353: [AIRFLOW-3480] Add Database 
Deploy/Update/Delete operators
URL: https://github.com/apache/incubator-airflow/pull/4353#discussion_r245112656
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_spanner_hook.py
 ##########
 @@ -168,39 +168,175 @@ def delete_instance(self, project_id, instance_id):
         """
         Deletes an existing Cloud Spanner instance.
 
-        :param project_id: The ID of the project which owns the instances, 
tables and data.
+        :param project_id: The ID of the GCP project that owns the Cloud 
Spanner database.
         :type project_id: str
-        :param instance_id: The ID of the instance.
+        :param instance_id:  The ID of the Cloud Spanner instance.
         :type instance_id: str
         """
-        client = self.get_client(project_id)
-        instance = client.instance(instance_id)
+        instance = self.get_client(project_id).instance(instance_id)
         try:
             instance.delete()
             return True
         except GoogleAPICallError as e:
-            self.log.error('An error occurred: %s. Aborting.', e.message)
+            self.log.error('An error occurred: %s. Exiting.', e.message)
+            raise e
+
+    def get_database(self, project_id, instance_id, database_id):
+        # type: (str, str, str) -> Optional[Database]
+        """
+        Retrieves a database in Cloud Spanner. If the database does not exist
+        in the specified instance, it returns None.
+
+        :param project_id: The ID of the GCP project that owns the Cloud 
Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :return: Database object or None if database does not exist
+        :rtype: Union[Database, None]
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project 
{} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        if not database.exists():
+            return None
+        else:
+            return database
+
+    def create_database(self, project_id, instance_id, database_id, 
ddl_statements):
+        # type: (str, str, str, [str]) -> bool
+        """
+        Creates a new database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud 
Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database to create in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new 
database.
+        :type ddl_statements: list[str]
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project 
{} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id,
+                                     ddl_statements=ddl_statements)
+        try:
+            operation = database.create()  # type: Operation
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: %s. Exiting.', e.message)
             raise e
 
+        if operation:
+            result = operation.result()
+            self.log.info(result)
+        return True
+
+    def update_database(self, project_id, instance_id, database_id, 
ddl_statements,
+                        operation_id=None):
+        # type: (str, str, str, [str], str) -> bool
+        """
+        Updates DDL of a database in Cloud Spanner.
+
+        :param project_id: The ID of the GCP project that owns the Cloud 
Spanner database.
+        :type project_id: str
+        :param instance_id: The ID of the Cloud Spanner instance.
+        :type instance_id: str
+        :param database_id: The ID of the database in Cloud Spanner.
+        :type database_id: str
+        :param ddl_statements: The string list containing DDL for the new 
database.
+        :type ddl_statements: list[str]
+        :param operation_id: (Optional) The unique per database operation ID 
that can be
+            specified to implement idempotency check.
+        :type operation_id: str
+        :return: True if everything succeeded
+        :rtype: bool
+        """
+
+        instance = self.get_client(project_id=project_id).instance(
+            instance_id=instance_id)
+        if not instance.exists():
+            raise AirflowException("The instance {} does not exist in project 
{} !".
+                                   format(instance_id, project_id))
+        database = instance.database(database_id=database_id)
+        try:
+            operation = database.update_ddl(
+                ddl_statements, operation_id=operation_id)
+            if operation:
+                result = operation.result()
+                self.log.info(result)
+            return True
+        except AlreadyExists as e:
+            if e.code == 409 and operation_id in e.message:
+                self.log.info("Replayed update_ddl message - the operation id 
{} "
+                              "was already done before.".format(operation_id))
+                return True
+        except GoogleAPICallError as e:
+            self.log.error('An error occurred: {}. Exiting.', e.message)
 
 Review comment:
   ```suggestion
               self.log.error('An error occurred: %s. Exiting.', e.message)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to