Repository: incubator-airflow Updated Branches: refs/heads/master 7220e72e3 -> 410736dbc
[AIRFLOW-716] Allow AVRO BigQuery load-job without schema Now allow a load job without specifying the schema fields or object. This allows for loading files with embedded schema like AVRO files. Also made optional values None instead of False to make it a bit more Pythoneske without breaking compatibility. Closes #1958 from alexvanboxel/feature/bq_load_avro Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/410736db Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/410736db Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/410736db Branch: refs/heads/master Commit: 410736dbc440d643fb3ea5f2094b64b8d4e3ba3b Parents: 7220e72 Author: Alex Van Boxel <[email protected]> Authored: Thu Dec 29 22:57:11 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Dec 29 22:57:11 2016 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 17 ++++++------ airflow/contrib/operators/gcs_to_bq.py | 41 ++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/410736db/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 450ee7a..d796565 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -18,20 +18,20 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249 implementation for BigQuery. """ -from builtins import range -from past.builtins import basestring - import logging import time -from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from airflow.hooks.dbapi_hook import DbApiHook from apiclient.discovery import build, HttpError +from builtins import range from pandas.io.gbq import GbqConnector, \ _parse_data as gbq_parse_data, \ _check_google_client_version as gbq_check_google_client_version, \ _test_google_api_imports as gbq_test_google_api_imports from pandas.tools.merge import concat +from past.builtins import basestring + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.hooks.dbapi_hook import DbApiHook logging.getLogger("bigquery").setLevel(logging.INFO) @@ -418,14 +418,15 @@ class BigQueryBaseCursor(object): 'datasetId': destination_dataset, 'tableId': destination_table, }, - 'schema': { - 'fields': schema_fields - }, 'sourceFormat': source_format, 'sourceUris': source_uris, 'writeDisposition': write_disposition, } } + if schema_fields: + configuration['load']['schema'] = { + 'fields': schema_fields + } if schema_update_options: if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/410736db/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index c2f0c79..aff539a 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -25,7 +25,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): """ Loads files from Google cloud storage into BigQuery. """ - template_fields = ('bucket','source_objects','schema_object','destination_project_dataset_table') + template_fields = ('bucket', 'source_objects', + 'schema_object', 'destination_project_dataset_table') template_ext = ('.sql',) ui_color = '#f0eee4' @@ -35,14 +36,14 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): bucket, source_objects, destination_project_dataset_table, - schema_fields=False, - schema_object=False, + schema_fields=None, + schema_object=None, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=',', - max_id_key=False, + max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, @@ -59,13 +60,15 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): :type bucket: string :param source_objects: List of Google cloud storage URIs to load from. :type object: list - :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> BigQuery table to load data - into. If <project> is not included, project will be the project defined in the connection json. + :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> + BigQuery table to load data into. If <project> is not included, project will + be the project defined in the connection json. :type destination_project_dataset_table: string :param schema_fields: If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load :type schema_fields: list - :param schema_object: If set, a GCS object path pointing to a .json file that contains the schema for the table. + :param schema_object: If set, a GCS object path pointing to a .json file that + contains the schema for the table. :param schema_object: string :param source_format: File format to export. :type source_format: string @@ -121,13 +124,21 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.schema_update_options = schema_update_options def execute(self, context): - gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, - delegate_to=self.delegate_to) bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) - schema_fields = self.schema_fields if self.schema_fields else json.loads(gcs_hook.download(self.bucket, self.schema_object).decode("utf-8")) - source_uris = ['gs://{}/{}'.format(self.bucket, schema_object) for schema_object in self.source_objects] + if not self.schema_fields and self.schema_object: + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + schema_fields = json.loads(gcs_hook.download( + self.bucket, + self.schema_object).decode("utf-8")) + else: + schema_fields = self.schema_fields + + source_uris = ['gs://{}/{}'.format(self.bucket, source_object) + for source_object in self.source_objects] conn = bq_hook.get_conn() cursor = conn.cursor() cursor.run_load( @@ -142,8 +153,12 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): schema_update_options=self.schema_update_options) if self.max_id_key: - cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table)) + cursor.execute('SELECT MAX({}) FROM {}'.format( + self.max_id_key, + self.destination_project_dataset_table)) row = cursor.fetchone() max_id = row[0] if row[0] else 0 - logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_project_dataset_table, self.max_id_key, max_id)) + logging.info('Loaded BQ data with max {}.{}={}'.format( + self.destination_project_dataset_table, + self.max_id_key, max_id)) return max_id
