Repository: incubator-airflow Updated Branches: refs/heads/master cc9295fe3 -> 07c2a515e
[AIRFLOW-1946][AIRFLOW-1855] Create a BigQuery Get Data Operator Closes #2896 from kaxil/patch-4 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07c2a515 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07c2a515 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07c2a515 Branch: refs/heads/master Commit: 07c2a515efd86169191642df17167dbef90d2d74 Parents: cc9295f Author: Kaxil Naik <[email protected]> Authored: Wed Jan 3 12:48:01 2018 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Jan 3 12:48:01 2018 -0800 ---------------------------------------------------------------------- airflow/contrib/operators/bigquery_get_data.py | 112 ++++++++++++++++++++ docs/code.rst | 3 + 2 files changed, 115 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/airflow/contrib/operators/bigquery_get_data.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py new file mode 100644 index 0000000..b3b25f2 --- /dev/null +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class BigQueryGetDataOperator(BaseOperator): + """ + Fetches the data from a BigQuery table (alternatively fetch data for selected columns) + and returns data in a python list. The number of elements in the returned list will + be equal to the number of rows fetched. Each element in the list will again be a list + where element would represent the columns values for that row. + + Example Result: [['Tony', '10'], ['Mike', '20'], ['Steve', '15']] + + Note: If you pass fields to `selected_fields` which are in different order than the + order of columns already in + BQ table, the data will still be in the order of BQ table. + For example if the BQ table has 3 columns as + [A,B,C] and you pass 'B,A' in the `selected_fields` + the data would still be of the form 'A,B'. + + Example: + + get_data = BigQueryGetDataOperator( + task_id='get_data_from_bq', + dataset_id='test_dataset', + table_id='Transaction_partitions', + max_results='100', + # selected_fields='DATE', + bigquery_conn_id='airflow-service-account' + ) + + :param dataset_id: The dataset ID of the requested table. + :type destination_dataset_table: string + :param table_id: The table ID of the requested table. + :type table_id: string + :param max_results: The maximum number of records (rows) to be fetched + from the table. + :type max_results: string + :param selected_fields: List of fields to return (comma-separated). If + unspecified, all fields are returned. + :type selected_fields: string + :param bigquery_conn_id: reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + template_fields = ('dataset_id', 'table_id', 'max_results') + ui_color = '#e4f0e8' + + @apply_defaults + def __init__(self, + dataset_id, + table_id, + max_results='100', + selected_fields=None, + bigquery_conn_id='bigquery_default', + delegate_to=None, + *args, + **kwargs): + super(BigQueryGetDataOperator, self).__init__(*args, **kwargs) + self.dataset_id = dataset_id + self.table_id = table_id + self.max_results = max_results + self.selected_fields = selected_fields + self.bigquery_conn_id = bigquery_conn_id + self.delegate_to = delegate_to + + def execute(self, context): + logging.info('Fetching Data from:') + logging.info('Dataset: %s ; Table: %s ; Max Results: %s', + self.dataset_id, self.table_id, self.max_results) + + hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + + conn = hook.get_conn() + cursor = conn.cursor() + response = cursor.get_tabledata(dataset_id=self.dataset_id, + table_id=self.table_id, + max_results=self.max_results, + selected_fields=self.selected_fields) + + logging.info('Total Extracted rows: %s', response['totalRows']) + rows = response['rows'] + + table_data = [] + for dict_row in rows: + single_row = [] + for fields in dict_row['f']: + single_row.append(fields['v']) + table_data.append(single_row) + + return table_data http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index e7b7a51..045e5a4 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -92,12 +92,15 @@ Community-contributed Operators Use :code:`from airflow.operators.bash_operator import BashOperator` instead. .. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor +.. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator .. autoclass:: airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator .. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator .. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator +.. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator +.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator
