Repository: incubator-airflow Updated Branches: refs/heads/master fb7b98b63 -> 0e3ed447b
[AIRFLOW-509][AIRFLOW-1] Create operator to delete tables in BigQuery We have a use case to delete BigQuery tables and views. This patch adds a delete operator that allows us to do so. Closes #1798 from illop/BigQueryDeleteOperator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e3ed447 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e3ed447 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e3ed447 Branch: refs/heads/master Commit: 0e3ed447b5ec854d4af918eeee19b9efee886918 Parents: fb7b98b Author: Ilya Rakoshes <il...@wepay.com> Authored: Thu Sep 15 10:16:51 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Thu Sep 15 10:17:04 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 39 +++++++++++- .../operators/bigquery_table_delete_operator.py | 65 ++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e3ed447/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index fe72a52..833dc8a 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -26,7 +26,7 @@ import time from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.hooks.dbapi_hook import DbApiHook -from apiclient.discovery import build +from apiclient.discovery import build, HttpError from pandas.io.gbq import GbqConnector, \ _parse_data as gbq_parse_data, \ _check_google_client_version as gbq_check_google_client_version, \ @@ -476,6 +476,43 @@ class BigQueryBaseCursor(object): .execute() ) + def run_table_delete(self, deletion_dataset_table, ignore_if_missing=False): + """ + Delete an existing table from the dataset; + If the table does not exist, return an error unless ignore_if_missing + is set to True. + :param deletion_dataset_table: A dotted + (<project>.|<project>:)<dataset>.<table> that indicates which table + will be deleted. + :type deletion_dataset_table: str + :param ignore_if_missing: if True, then return success even if the + requested table does not exist. + :type ignore_if_missing: boolean + :return: + """ + + assert '.' in deletion_dataset_table, ( + 'Expected deletion_dataset_table in the format of ' + '<dataset>.<table>. Got: {}').format(deletion_dataset_table) + deletion_project, deletion_dataset, deletion_table = \ + _split_tablename(deletion_dataset_table, self.project_id) + + try: + tables_resource = self.service.tables() \ + .delete(projectId=deletion_project, + datasetId=deletion_dataset, + tableId=deletion_table) \ + .execute() + logging.info('Deleted table %s:%s.%s.', + deletion_project, deletion_dataset, deletion_table) + except HttpError: + if not ignore_if_missing: + raise Exception( + 'Table deletion failed. Table does not exist.') + else: + logging.info('Table does not exist. Skipping.') + + def run_table_upsert(self, dataset_id, table_resource, project_id=None): """ creates a new, empty table in the dataset; http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e3ed447/airflow/contrib/operators/bigquery_table_delete_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py new file mode 100644 index 0000000..b879939 --- /dev/null +++ b/airflow/contrib/operators/bigquery_table_delete_operator.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class BigQueryTableDeleteOperator(BaseOperator): + """ + Deletes BigQuery tables + """ + ui_color = '#ffd1dc' + + @apply_defaults + def __init__(self, + deletion_dataset_table, + bigquery_conn_id='bigquery_default', + delegate_to=None, + ignore_if_missing=False, + *args, + **kwargs): + """ + Create a new BigQueryTableDeleteOperator. + + :param deletion_dataset_table: A dotted + (<project>.|<project>:)<dataset>.<table> that indicates which table + will be deleted. + :type deletion_dataset_table: string + :param bigquery_conn_id: reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param ignore_if_missing: if True, then return success even if the + requested table does not exist. + :type ignore_if_missing: boolean + """ + super(BigQueryTableDeleteOperator, self).__init__(*args, **kwargs) + self.deletion_dataset_table = deletion_dataset_table + self.bigquery_conn_id = bigquery_conn_id + self.delegate_to = delegate_to + self.ignore_if_missing = ignore_if_missing + + def execute(self, context): + logging.info('Deleting: %s', self.deletion_dataset_table) + hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + conn = hook.get_conn() + cursor = conn.cursor() + cursor.run_table_delete(self.deletion_dataset_table, self.ignore_if_missing)