Repository: incubator-airflow
Updated Branches:
  refs/heads/master fb7b98b63 -> 0e3ed447b


[AIRFLOW-509][AIRFLOW-1] Create operator to delete tables in BigQuery

We have a use case to delete BigQuery tables and views. This patch
adds a delete operator that allows us to do so.

Closes #1798 from illop/BigQueryDeleteOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e3ed447
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e3ed447
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e3ed447

Branch: refs/heads/master
Commit: 0e3ed447b5ec854d4af918eeee19b9efee886918
Parents: fb7b98b
Author: Ilya Rakoshes <il...@wepay.com>
Authored: Thu Sep 15 10:16:51 2016 -0700
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Thu Sep 15 10:17:04 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py          | 39 +++++++++++-
 .../operators/bigquery_table_delete_operator.py | 65 ++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e3ed447/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index fe72a52..833dc8a 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -26,7 +26,7 @@ import time
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.hooks.dbapi_hook import DbApiHook
-from apiclient.discovery import build
+from apiclient.discovery import build, HttpError
 from pandas.io.gbq import GbqConnector, \
     _parse_data as gbq_parse_data, \
     _check_google_client_version as gbq_check_google_client_version, \
@@ -476,6 +476,43 @@ class BigQueryBaseCursor(object):
             .execute()
         )
 
+    def run_table_delete(self, deletion_dataset_table, 
ignore_if_missing=False):
+        """
+        Delete an existing table from the dataset;
+        If the table does not exist, return an error unless ignore_if_missing
+        is set to True.
+        :param deletion_dataset_table: A dotted
+        (<project>.|<project>:)<dataset>.<table> that indicates which table
+        will be deleted.
+        :type deletion_dataset_table: str
+        :param ignore_if_missing: if True, then return success even if the
+        requested table does not exist.
+        :type ignore_if_missing: boolean
+        :return:
+        """
+
+        assert '.' in deletion_dataset_table, (
+            'Expected deletion_dataset_table in the format of '
+            '<dataset>.<table>. Got: {}').format(deletion_dataset_table)
+        deletion_project, deletion_dataset, deletion_table = \
+            _split_tablename(deletion_dataset_table, self.project_id)
+
+        try:
+            tables_resource = self.service.tables() \
+                .delete(projectId=deletion_project,
+                        datasetId=deletion_dataset,
+                        tableId=deletion_table) \
+                .execute()
+            logging.info('Deleted table %s:%s.%s.',
+                         deletion_project, deletion_dataset, deletion_table)
+        except HttpError:
+            if not ignore_if_missing:
+                raise Exception(
+                    'Table deletion failed. Table does not exist.')
+            else:
+                logging.info('Table does not exist. Skipping.')
+
+
     def run_table_upsert(self, dataset_id, table_resource, project_id=None):
         """
         creates a new, empty table in the dataset;

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e3ed447/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py 
b/airflow/contrib/operators/bigquery_table_delete_operator.py
new file mode 100644
index 0000000..b879939
--- /dev/null
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryTableDeleteOperator(BaseOperator):
+    """
+    Deletes BigQuery tables
+    """
+    ui_color = '#ffd1dc'
+
+    @apply_defaults
+    def __init__(self,
+                 deletion_dataset_table,
+                 bigquery_conn_id='bigquery_default',
+                 delegate_to=None,
+                 ignore_if_missing=False,
+                 *args,
+                 **kwargs):
+        """
+        Create a new BigQueryTableDeleteOperator.
+
+        :param deletion_dataset_table: A dotted
+            (<project>.|<project>:)<dataset>.<table> that indicates which table
+            will be deleted.
+        :type deletion_dataset_table: string
+        :param bigquery_conn_id: reference to a specific BigQuery hook.
+        :type bigquery_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have 
domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        :param ignore_if_missing: if True, then return success even if the
+            requested table does not exist.
+        :type ignore_if_missing: boolean
+        """
+        super(BigQueryTableDeleteOperator, self).__init__(*args, **kwargs)
+        self.deletion_dataset_table = deletion_dataset_table
+        self.bigquery_conn_id = bigquery_conn_id
+        self.delegate_to = delegate_to
+        self.ignore_if_missing = ignore_if_missing
+
+    def execute(self, context):
+        logging.info('Deleting: %s', self.deletion_dataset_table)
+        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                            delegate_to=self.delegate_to)
+        conn = hook.get_conn()
+        cursor = conn.cursor()
+        cursor.run_table_delete(self.deletion_dataset_table, 
self.ignore_if_missing)

Reply via email to