Repository: incubator-airflow Updated Branches: refs/heads/master dedc54eea -> beb285205
[AIRFLOW-638] Add schema_update_options to GCP ops Closes #1891 from Jalepeno112/feature/gcs_to_bq_schemaUpdateOptions Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/beb28520 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/beb28520 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/beb28520 Branch: refs/heads/master Commit: beb285205780d230f6862fad56e609bb81996c01 Parents: dedc54e Author: Giovanni Briggs <[email protected]> Authored: Mon Nov 21 13:31:57 2016 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Nov 21 13:32:08 2016 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 39 ++++++++++++++++++++++++++--- airflow/contrib/operators/gcs_to_bq.py | 9 ++++++- tests/contrib/hooks/bigquery_hook.py | 25 +++++++++++++++++- 3 files changed, 68 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 700a39e..b76126a 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -341,7 +341,8 @@ class BigQueryBaseCursor(object): create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', - field_delimiter=','): + field_delimiter=',', + schema_update_options=[]): """ Executes a BigQuery load command to load data from Google Cloud Storage to BigQuery. See here: @@ -372,20 +373,38 @@ class BigQueryBaseCursor(object): :type write_disposition: string :param field_delimiter: The delimiter to use when loading from a CSV. :type field_delimiter: string + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the load job. + :type schema_update_options: list """ # bigquery only allows certain source formats # we check to make sure the passed source format is valid # if it's not, we raise a ValueError - # Refer to this link for more details: + # Refer to this link for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat source_format = source_format.upper() allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS"] if source_format not in allowed_formats: - raise ValueError("{0} is not a valid source format. " + raise ValueError("{0} is not a valid source format. " "Please use one of the following types: {1}" .format(source_format, allowed_formats)) + # bigquery also allows you to define how you want a table's schema to change + # as a side effect of a load + # for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions + allowed_schema_update_options = [ + 'ALLOW_FIELD_ADDITION', + "ALLOW_FIELD_RELAXATION" + ] + if not set(allowed_schema_update_options).issuperset(set(schema_update_options)): + raise ValueError( + "{0} contains invalid schema update options. " + "Please only use one or more of the following options: {1}" + .format(schema_update_options, allowed_schema_update_options) + ) + destination_project, destination_dataset, destination_table = \ _split_tablename(table_input=destination_project_dataset_table, default_project_id=self.project_id, @@ -408,6 +427,20 @@ class BigQueryBaseCursor(object): } } + if schema_update_options: + if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: + raise ValueError( + "schema_update_options is only " + "allowed if write_disposition is " + "'WRITE_APPEND' or 'WRITE_TRUNCATE'." + ) + else: + logging.info( + "Adding experimental " + "'schemaUpdateOptions': {0}".format(schema_update_options) + ) + configuration['load']['schemaUpdateOptions'] = schema_update_options + if source_format == 'CSV': configuration['load']['skipLeadingRows'] = skip_leading_rows configuration['load']['fieldDelimiter'] = field_delimiter http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 25b338d..26d996a 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -46,6 +46,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_storage_default', delegate_to=None, + schema_update_options=[], *args, **kwargs): """ @@ -92,6 +93,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the load job. + :type schema_update_options: list """ super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs) @@ -114,6 +118,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.google_cloud_storage_conn_id = google_cloud_storage_conn_id self.delegate_to = delegate_to + self.schema_update_options = schema_update_options + def execute(self, context): gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) @@ -132,7 +138,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): create_disposition=self.create_disposition, skip_leading_rows=self.skip_leading_rows, write_disposition=self.write_disposition, - field_delimiter=self.field_delimiter) + field_delimiter=self.field_delimiter, + schema_update_options=self.schema_update_options) if self.max_id_key: cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/beb28520/tests/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/bigquery_hook.py b/tests/contrib/hooks/bigquery_hook.py index 3a58766..68856f8 100644 --- a/tests/contrib/hooks/bigquery_hook.py +++ b/tests/contrib/hooks/bigquery_hook.py @@ -108,9 +108,32 @@ class TestBigQueryHookSourceFormat(unittest.TestCase): def test_invalid_source_format(self): with self.assertRaises(Exception) as context: hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json") - + # since we passed 'json' in, and it's not valid, make sure it's present in the error string. self.assertIn("json", str(context.exception)) + +class TestBigQueryBaseCursor(unittest.TestCase): + def test_invalid_schema_update_options(self): + with self.assertRaises(Exception) as context: + hook.BigQueryBaseCursor("test", "test").run_load( + "test.test", + "test_schema.json", + ["test_data.json"], + schema_update_options=["THIS IS NOT VALID"] + ) + self.assertIn("THIS IS NOT VALID", str(context.exception)) + + def test_invalid_schema_update_and_write_disposition(self): + with self.assertRaises(Exception) as context: + hook.BigQueryBaseCursor("test", "test").run_load( + "test.test", + "test_schema.json", + ["test_data.json"], + schema_update_options=['ALLOW_FIELD_ADDITION'], + write_disposition='WRITE_EMPTY' + ) + self.assertIn("schema_update_options is only", str(context.exception)) + if __name__ == '__main__': unittest.main()
