Repository: incubator-airflow Updated Branches: refs/heads/master e010cb29b -> 617ba7412
[AIRFLOW-728] Add Google BigQuery table sensor Design a sensor that checks whether a certain table is present in bigquery. The sensor will accept the google cloud project id, bigquery dataset id and bigquery table id to check as parameters. Internally, it will use the bigquery hook to check for the existence of the table. Therefore a 'table_exists' method will be added to the existing Bigquery hook. Closes #1970 from bodschut/feature/bq_sensor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/617ba741 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/617ba741 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/617ba741 Branch: refs/heads/master Commit: 617ba741205ddea8461fc287267fc9c371ace2de Parents: e010cb2 Author: Bob De Schutter <[email protected]> Authored: Mon Jan 9 21:46:16 2017 +0100 Committer: Alex Van Boxel <[email protected]> Committed: Mon Jan 9 21:46:16 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 27 ++++++++++ airflow/contrib/sensors/bigquery_sensor.py | 69 +++++++++++++++++++++++++ 2 files changed, 96 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index d796565..53ca123 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -22,6 +22,7 @@ import logging import time from apiclient.discovery import build, HttpError +from googleapiclient import errors from builtins import range from pandas.io.gbq import GbqConnector, \ _parse_data as gbq_parse_data, \ @@ -100,6 +101,32 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook): else: return gbq_parse_data(schema, []) + def table_exists(self, project_id, dataset_id, table_id): + """ + Checks for the existence of a table in Google BigQuery. + + :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook + must provide access to the specified project. + :type project_id: string + :param dataset_id: The name of the dataset in which to look for the table. + storage bucket. + :type dataset_id: string + :param table_id: The name of the table to check the existence of. + :type table_id: string + """ + service = self.get_service() + try: + service.tables().get( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id + ).execute() + return True + except errors.HttpError as e: + if e.resp['status'] == '404': + return False + raise + class BigQueryPandasConnector(GbqConnector): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/sensors/bigquery_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py new file mode 100644 index 0000000..8a8ca62 --- /dev/null +++ b/airflow/contrib/sensors/bigquery_sensor.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.operators.sensors import BaseSensorOperator +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.utils.decorators import apply_defaults + + +class BigQueryTableSensor(BaseSensorOperator): + """ + Checks for the existence of a table in Google Bigquery. + """ + template_fields = ('project_id', 'dataset_id', 'table_id',) + ui_color = '#f0eee4' + + @apply_defaults + def __init__( + self, + project_id, + dataset_id, + table_id, + bigquery_conn_id='bigquery_default_conn', + delegate_to=None, + *args, + **kwargs): + """ + Create a new BigQueryTableSensor. + + :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook + must provide access to the specified project. + :type project_id: string + :param dataset_id: The name of the dataset in which to look for the table. + storage bucket. + :type dataset_id: string + :param table_id: The name of the table to check the existence of. + :type table_id: string + :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery. + :type bigquery_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide delegation enabled. + :type delegate_to: string + """ + super(BigQueryTableSensor, self).__init__(*args, **kwargs) + self.project_id = project_id + self.dataset_id = dataset_id + self.table_id = table_id + self.bigquery_conn_id = bigquery_conn_id + self.delegate_to = delegate_to + + def poke(self, context): + table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id) + logging.info('Sensor checks existence of table: %s', table_uri) + hook = BigQueryHook( + bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + return hook.table_exists(self.project_id, self.dataset_id, self.table_id)
