Repository: incubator-airflow Updated Branches: refs/heads/master f9ddb36df -> e1bf38942
[AIRFLOW-1943] Add External BigQuery Table feature Add ability to create a BigQuery External Table. - Add new method create_external_table() in BigQueryHook() - Add parameters to existing GoogleCloudStorageToBigQueryOperator() Closes #2948 from kaxil/external_table Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1bf3894 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1bf3894 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1bf3894 Branch: refs/heads/master Commit: e1bf38942f3fa2fd41f6c4f6824a02b01e92ae38 Parents: f9ddb36 Author: Kaxil Naik <[email protected]> Authored: Fri Jan 26 11:57:12 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jan 26 11:57:12 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 188 +++++++++++++++++++++++++ airflow/contrib/operators/gcs_to_bq.py | 64 ++++++--- tests/contrib/hooks/test_bigquery_hook.py | 30 +++- 3 files changed, 262 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 4ab4ac0..cd6bf32 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -207,6 +207,194 @@ class BigQueryBaseCursor(LoggingMixin): self.use_legacy_sql = use_legacy_sql self.running_job_id = None + def create_external_table(self, + external_project_dataset_table, + schema_fields, + source_uris, + source_format='CSV', + autodetect=False, + compression='NONE', + ignore_unknown_values=False, + max_bad_records=0, + skip_leading_rows=0, + field_delimiter=',', + quote_character=None, + allow_quoted_newlines=False, + allow_jagged_rows=False, + src_fmt_configs={} + ): + """ + Creates a new external table in the dataset with the data in Google + Cloud Storage. See here: + + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource + + for more details about these parameters. + + :param external_project_dataset_table: + The dotted (<project>.|<project>:)<dataset>.<table>($<partition>) BigQuery + table name to create external table. + If <project> is not included, project will be the + project defined in the connection json. + :type external_project_dataset_table: string + :param schema_fields: The schema field list as defined here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource + :type schema_fields: list + :param source_uris: The source Google Cloud + Storage URI (e.g. gs://some-bucket/some-file.txt). A single wild + per-object name can be used. + :type source_uris: list + :param source_format: File format to export. + :type source_format: string + :param autodetect: Try to detect schema and format options automatically. + Any option specified explicitly will be honored. + :type autodetect: bool + :param compression: [Optional] The compression type of the data source. + Possible values include GZIP and NONE. + The default value is NONE. + This setting is ignored for Google Cloud Bigtable, + Google Cloud Datastore backups and Avro formats. + :type compression: string + :param ignore_unknown_values: [Optional] Indicates if BigQuery should allow + extra values that are not represented in the table schema. + If true, the extra values are ignored. If false, records with extra columns + are treated as bad records, and if there are too many bad records, an + invalid error is returned in the job result. + :type ignore_unknown_values: bool + :param max_bad_records: The maximum number of bad records that BigQuery can + ignore when running the job. + :type max_bad_records: int + :param skip_leading_rows: Number of rows to skip when loading from a CSV. + :type skip_leading_rows: int + :param field_delimiter: The delimiter to use when loading from a CSV. + :type field_delimiter: string + :param quote_character: The value that is used to quote data sections in a CSV + file. + :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not + (false). + :type allow_quoted_newlines: boolean + :param allow_jagged_rows: Accept rows that are missing trailing optional columns. + The missing values are treated as nulls. If false, records with missing + trailing columns are treated as bad records, and if there are too many bad + records, an invalid error is returned in the job result. Only applicable when + soure_format is CSV. + :type allow_jagged_rows: bool + :param src_fmt_configs: configure optional fields specific to the source format + :type src_fmt_configs: dict + """ + + project_id, dataset_id, external_table_id = \ + _split_tablename(table_input=external_project_dataset_table, + default_project_id=self.project_id, + var_name='external_project_dataset_table') + + # bigquery only allows certain source formats + # we check to make sure the passed source format is valid + # if it's not, we raise a ValueError + # Refer to this link for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat + + source_format = source_format.upper() + allowed_formats = [ + "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", + "DATASTORE_BACKUP" + ] + if source_format not in allowed_formats: + raise ValueError("{0} is not a valid source format. " + "Please use one of the following types: {1}" + .format(source_format, allowed_formats)) + + compression = compression.upper() + allowed_compressions = ['NONE', 'GZIP'] + if compression not in allowed_compressions: + raise ValueError("{0} is not a valid compression format. " + "Please use one of the following types: {1}" + .format(compression, allowed_compressions)) + + table_resource = { + 'externalDataConfiguration': { + 'autodetect': autodetect, + 'sourceFormat': source_format, + 'sourceUris': source_uris, + 'compression': compression, + 'ignoreUnknownValues': ignore_unknown_values + }, + 'tableReference': { + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': external_table_id, + } + } + + if schema_fields: + table_resource['externalDataConfiguration'].update({ + 'schema': { + 'fields': schema_fields + } + }) + + self.log.info('Creating external table: %s', external_project_dataset_table) + + if max_bad_records: + table_resource['externalDataConfiguration']['maxBadRecords'] = max_bad_records + + # if following fields are not specified in src_fmt_configs, + # honor the top-level params for backward-compatibility + if 'skipLeadingRows' not in src_fmt_configs: + src_fmt_configs['skipLeadingRows'] = skip_leading_rows + if 'fieldDelimiter' not in src_fmt_configs: + src_fmt_configs['fieldDelimiter'] = field_delimiter + if 'quote_character' not in src_fmt_configs: + src_fmt_configs['quote'] = quote_character + if 'allowQuotedNewlines' not in src_fmt_configs: + src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines + if 'allowJaggedRows' not in src_fmt_configs: + src_fmt_configs['allowJaggedRows'] = allow_jagged_rows + + src_fmt_to_param_mapping = { + 'CSV': 'csvOptions', + 'GOOGLE_SHEETS': 'googleSheetsOptions' + } + + src_fmt_to_configs_mapping = { + 'csvOptions': [ + 'allowJaggedRows', 'allowQuotedNewlines', + 'fieldDelimiter', 'skipLeadingRows', + 'quote' + ], + 'googleSheetsOptions': ['skipLeadingRows'] + } + + if source_format in src_fmt_to_param_mapping.keys(): + + valid_configs = src_fmt_to_configs_mapping[ + src_fmt_to_param_mapping[source_format] + ] + + src_fmt_configs = { + k: v + for k, v in src_fmt_configs.items() if k in valid_configs + } + + table_resource['externalDataConfiguration'][src_fmt_to_param_mapping[ + source_format]] = src_fmt_configs + + try: + self.service.tables().insert( + projectId=project_id, + datasetId=dataset_id, + body=table_resource + ).execute() + + self.log.info('External table created successfully: %s', + external_project_dataset_table) + + except HttpError as err: + raise Exception( + 'BigQuery job failed. Error was: {}'.format(err.content) + ) + def run_query(self, bql, destination_dataset_table=False, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 7625bbe..f68da7f 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -47,6 +47,12 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): :param schema_object: string :param source_format: File format to export. :type source_format: string + :param compression: [Optional] The compression type of the data source. + Possible values include GZIP and NONE. + The default value is NONE. + This setting is ignored for Google Cloud Bigtable, + Google Cloud Datastore backups and Avro formats. + :type compression: string :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: string :param skip_leading_rows: Number of rows to skip when loading from a CSV. @@ -84,11 +90,14 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string - :param schema_update_options: Allows the schema of the desitination + :param schema_update_options: Allows the schema of the destination table to be updated as a side effect of the load job. :type schema_update_options: list :param src_fmt_configs: configure optional fields specific to the source format :type src_fmt_configs: dict + :param external_table: Flag to specify if the destination table should be + a BigQuery external table. Default Value is False. + :type external_table: bool :param time_partitioning: configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. Note that 'field' is not available in concurrency with @@ -108,6 +117,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): schema_fields=None, schema_object=None, source_format='CSV', + compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', @@ -122,6 +132,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): delegate_to=None, schema_update_options=(), src_fmt_configs={}, + external_table=False, time_partitioning={}, *args, **kwargs): @@ -136,6 +147,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.destination_project_dataset_table = destination_project_dataset_table self.schema_fields = schema_fields self.source_format = source_format + self.compression = compression self.create_disposition = create_disposition self.skip_leading_rows = skip_leading_rows self.write_disposition = write_disposition @@ -144,6 +156,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.quote_character = quote_character self.allow_quoted_newlines = allow_quoted_newlines self.allow_jagged_rows = allow_jagged_rows + self.external_table = external_table self.max_id_key = max_id_key self.bigquery_conn_id = bigquery_conn_id @@ -173,22 +186,39 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): for source_object in self.source_objects] conn = bq_hook.get_conn() cursor = conn.cursor() - cursor.run_load( - destination_project_dataset_table=self.destination_project_dataset_table, - schema_fields=schema_fields, - source_uris=source_uris, - source_format=self.source_format, - create_disposition=self.create_disposition, - skip_leading_rows=self.skip_leading_rows, - write_disposition=self.write_disposition, - field_delimiter=self.field_delimiter, - max_bad_records=self.max_bad_records, - quote_character=self.quote_character, - allow_quoted_newlines=self.allow_quoted_newlines, - allow_jagged_rows=self.allow_jagged_rows, - schema_update_options=self.schema_update_options, - src_fmt_configs=self.src_fmt_configs, - time_partitioning=self.time_partitioning) + + if self.external_table: + cursor.create_external_table( + external_project_dataset_table=self.destination_project_dataset_table, + schema_fields=schema_fields, + source_uris=source_uris, + source_format=self.source_format, + compression=self.compression, + skip_leading_rows=self.skip_leading_rows, + field_delimiter=self.field_delimiter, + max_bad_records=self.max_bad_records, + quote_character=self.quote_character, + allow_quoted_newlines=self.allow_quoted_newlines, + allow_jagged_rows=self.allow_jagged_rows, + src_fmt_configs=self.src_fmt_configs + ) + else: + cursor.run_load( + destination_project_dataset_table=self.destination_project_dataset_table, + schema_fields=schema_fields, + source_uris=source_uris, + source_format=self.source_format, + create_disposition=self.create_disposition, + skip_leading_rows=self.skip_leading_rows, + write_disposition=self.write_disposition, + field_delimiter=self.field_delimiter, + max_bad_records=self.max_bad_records, + quote_character=self.quote_character, + allow_quoted_newlines=self.allow_quoted_newlines, + allow_jagged_rows=self.allow_jagged_rows, + schema_update_options=self.schema_update_options, + src_fmt_configs=self.src_fmt_configs, + time_partitioning=self.time_partitioning) if self.max_id_key: cursor.execute('SELECT MAX({}) FROM {}'.format( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1bf3894/tests/contrib/hooks/test_bigquery_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 4c0eefa..a5dd595 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -157,19 +157,43 @@ class TestBigQueryTableSplitter(unittest.TestCase): self.assertIn('Format exception for var_x:', str(context.exception), "") + class TestBigQueryHookSourceFormat(unittest.TestCase): def test_invalid_source_format(self): with self.assertRaises(Exception) as context: - hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json") + hook.BigQueryBaseCursor("test", "test").run_load( + "test.test", "test_schema.json", ["test_data.json"], source_format="json" + ) + + # since we passed 'json' in, and it's not valid, make sure it's present in the + # error string. + self.assertIn("JSON", str(context.exception)) + + +class TestBigQueryExternalTableSourceFormat(unittest.TestCase): + def test_invalid_source_format(self): + with self.assertRaises(Exception) as context: + hook.BigQueryBaseCursor("test", "test").create_external_table( + external_project_dataset_table='test.test', + schema_fields='test_schema.json', + source_uris=['test_data.json'], + source_format='json' + ) - # since we passed 'json' in, and it's not valid, make sure it's present in the error string. + # since we passed 'csv' in, and it's not valid, make sure it's present in the + # error string. self.assertIn("JSON", str(context.exception)) -# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, unless mock_job_cancel was called with the same job_id + +# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, +# unless mock_job_cancel was called with the same job_id mock_canceled_jobs = [] + + def mock_poll_job_complete(job_id): return job_id in mock_canceled_jobs + def mock_job_cancel(projectId, jobId): mock_canceled_jobs.append(jobId) return mock.Mock()
