Repository: incubator-airflow Updated Branches: refs/heads/master a47b2776f -> b220fe60d
[AIRFLOW-2513] Change `bql` to `sql` for BigQuery Hooks & Ops - Change `bql` to `sql` for BigQuery Hooks & Operators for consistency Closes #3454 from kaxil/consistent-bq-lang Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b220fe60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b220fe60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b220fe60 Branch: refs/heads/master Commit: b220fe60d5302546d85ee15cf6c8eaa5859316d3 Parents: a47b277 Author: Kaxil Naik <[email protected]> Authored: Mon Jun 4 10:04:03 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Mon Jun 4 10:04:03 2018 +0100 ---------------------------------------------------------------------- UPDATING.md | 4 ++ airflow/contrib/hooks/bigquery_hook.py | 39 +++++++++++++++----- airflow/contrib/operators/bigquery_operator.py | 33 ++++++++++++++--- tests/contrib/hooks/test_bigquery_hook.py | 39 +++++++++++++++----- .../contrib/operators/test_bigquery_operator.py | 24 +++++++++--- 5 files changed, 110 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index c9e1395..b32b2ac 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -63,6 +63,10 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d "airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version to 2.2.0 or greater. +### BigQuery Hooks and Operator +The `bql` parameter passed to `BigQueryOperator` and `BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for consistency purposes. Using `bql` will still work (and raise a `DeprecationWarning`), but is no longer +supported and will be removed entirely in Airflow 2.0 + ### Redshift to S3 Operator With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row, we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index f4a0a58..bddc2ef 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -82,7 +82,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): """ raise NotImplementedError() - def get_pandas_df(self, bql, parameters=None, dialect=None): + def get_pandas_df(self, sql, parameters=None, dialect=None): """ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas @@ -91,8 +91,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900 - :param bql: The BigQuery SQL to execute. - :type bql: string + :param sql: The BigQuery SQL to execute. + :type sql: string :param parameters: The parameters to render the SQL query with (not used, leave to override superclass method) :type parameters: mapping or iterable @@ -103,7 +103,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): if dialect is None: dialect = 'legacy' if self.use_legacy_sql else 'standard' - return read_gbq(bql, + return read_gbq(sql, project_id=self._get_field('project'), dialect=dialect, verbose=False) @@ -454,7 +454,8 @@ class BigQueryBaseCursor(LoggingMixin): ) def run_query(self, - bql, + bql=None, + sql=None, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, @@ -476,8 +477,11 @@ class BigQueryBaseCursor(LoggingMixin): For more details about these parameters. - :param bql: The BigQuery SQL to execute. + :param bql: (Deprecated. Use `sql` parameter instead) The BigQuery SQL + to execute. :type bql: string + :param sql: The BigQuery SQL to execute. + :type sql: string :param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to save the query results. :type destination_dataset_table: string @@ -526,6 +530,23 @@ class BigQueryBaseCursor(LoggingMixin): """ + # TODO remove `bql` in Airflow 2.0 - Jira: [AIRFLOW-2513] + sql = bql if sql is None else sql + + if bql: + import warnings + warnings.warn('Deprecated parameter `bql` used in ' + '`BigQueryBaseCursor.run_query` ' + 'Use `sql` parameter instead to pass the sql to be ' + 'executed. `bql` parameter is deprecated and ' + 'will be removed in a future version of ' + 'Airflow.', + category=DeprecationWarning) + + if sql is None: + raise TypeError('`BigQueryBaseCursor.run_query` missing 1 required ' + 'positional argument: `sql`') + # BigQuery also allows you to define how you want a table's schema to change # as a side effect of a query job # for more details: @@ -545,7 +566,7 @@ class BigQueryBaseCursor(LoggingMixin): configuration = { 'query': { - 'query': bql, + 'query': sql, 'useLegacySql': use_legacy_sql, 'maximumBillingTier': maximum_billing_tier, 'maximumBytesBilled': maximum_bytes_billed, @@ -1277,9 +1298,9 @@ class BigQueryCursor(BigQueryBaseCursor): :param parameters: Parameters to substitute into the query. :type parameters: dict """ - bql = _bind_parameters(operation, + sql = _bind_parameters(operation, parameters) if parameters else operation - self.job_id = self.run_query(bql) + self.job_id = self.run_query(sql) def executemany(self, operation, seq_of_parameters): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index fb35b03..b36efbd 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -29,10 +29,15 @@ class BigQueryOperator(BaseOperator): """ Executes BigQuery SQL queries in a specific BigQuery database - :param bql: the sql code to be executed + :param bql: (Deprecated. Use `sql` parameter instead) the sql code to be + executed (templated) :type bql: Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. - Template reference are recognized by str ending in '.sql'. (templated) + Template reference are recognized by str ending in '.sql'. + :param sql: the sql code to be executed (templated) + :type sql: Can receive a str representing a sql statement, + a list of str (sql statements), or reference to a template file. + Template reference are recognized by str ending in '.sql'. :param destination_dataset_table: A dotted (<project>.|<project>:)<dataset>.<table> that, if set, will store the results of the query. (templated) @@ -87,13 +92,14 @@ class BigQueryOperator(BaseOperator): :type time_partitioning: dict """ - template_fields = ('bql', 'destination_dataset_table') + template_fields = ('bql', 'sql', 'destination_dataset_table') template_ext = ('.sql', ) ui_color = '#e4f0e8' @apply_defaults def __init__(self, - bql, + bql=None, + sql=None, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, @@ -113,6 +119,7 @@ class BigQueryOperator(BaseOperator): **kwargs): super(BigQueryOperator, self).__init__(*args, **kwargs) self.bql = bql + self.sql = sql if sql else bql self.destination_dataset_table = destination_dataset_table self.write_disposition = write_disposition self.create_disposition = create_disposition @@ -130,9 +137,23 @@ class BigQueryOperator(BaseOperator): self.priority = priority self.time_partitioning = time_partitioning + # TODO remove `bql` in Airflow 2.0 + if self.bql: + import warnings + warnings.warn('Deprecated parameter `bql` used in Task id: {}. ' + 'Use `sql` parameter instead to pass the sql to be ' + 'executed. `bql` parameter is deprecated and ' + 'will be removed in a future version of ' + 'Airflow.'.format(self.task_id), + category=DeprecationWarning) + + if self.sql is None: + raise TypeError('{} missing 1 required positional ' + 'argument: `sql`'.format(self.task_id)) + def execute(self, context): if self.bq_cursor is None: - self.log.info('Executing: %s', self.bql) + self.log.info('Executing: %s', self.sql) hook = BigQueryHook( bigquery_conn_id=self.bigquery_conn_id, use_legacy_sql=self.use_legacy_sql, @@ -140,7 +161,7 @@ class BigQueryOperator(BaseOperator): conn = hook.get_conn() self.bq_cursor = conn.cursor() self.bq_cursor.run_query( - self.bql, + self.sql, destination_dataset_table=self.destination_dataset_table, write_disposition=self.write_disposition, allow_large_results=self.allow_large_results, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/tests/contrib/hooks/test_bigquery_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 8841598..31da503 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,8 @@ # import unittest +import warnings + import mock from airflow.contrib.hooks import bigquery_hook as hook @@ -33,6 +35,7 @@ try: except HttpAccessTokenRefreshError: bq_available = False + class TestBigQueryDataframeResults(unittest.TestCase): def setUp(self): self.instance = hook.BigQueryHook() @@ -67,6 +70,7 @@ class TestBigQueryDataframeResults(unittest.TestCase): self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery', str(context.exception), "") + class TestBigQueryTableSplitter(unittest.TestCase): def test_internal_need_default_project(self): with self.assertRaises(Exception) as context: @@ -104,16 +108,14 @@ class TestBigQueryTableSplitter(unittest.TestCase): self.assertEqual("dataset", dataset) self.assertEqual("table", table) - def test_valid_double_column(self): project, dataset, table = hook._split_tablename('alt1:alt:dataset.table', - 'project') + 'project') self.assertEqual('alt1:alt', project) self.assertEqual("dataset", dataset) self.assertEqual("table", table) - def test_invalid_syntax_triple_colon(self): with self.assertRaises(Exception) as context: hook._split_tablename('alt1:alt2:alt3:dataset.table', @@ -123,7 +125,6 @@ class TestBigQueryTableSplitter(unittest.TestCase): str(context.exception), "") self.assertFalse('Format exception for' in str(context.exception)) - def test_invalid_syntax_triple_dot(self): with self.assertRaises(Exception) as context: hook._split_tablename('alt1.alt.dataset.table', @@ -213,9 +214,29 @@ class TestBigQueryBaseCursor(unittest.TestCase): "test_schema.json", ["test_data.json"], schema_update_options=["THIS IS NOT VALID"] - ) + ) self.assertIn("THIS IS NOT VALID", str(context.exception)) + @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') + def test_bql_deprecation_warning(self, mock_rwc): + with warnings.catch_warnings(record=True) as w: + hook.BigQueryBaseCursor("test", "test").run_query( + bql='select * from test_table' + ) + self.assertIn( + 'Deprecated parameter `bql`', + w[0].message.args[0]) + + def test_nobql_nosql_param_error(self): + with self.assertRaises(TypeError) as context: + hook.BigQueryBaseCursor("test", "test").run_query( + sql=None, + bql=None + ) + self.assertIn( + 'missing 1 required positional', + str(context.exception)) + def test_invalid_schema_update_and_write_disposition(self): with self.assertRaises(Exception) as context: hook.BigQueryBaseCursor("test", "test").run_load( @@ -321,7 +342,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase): mocked_rwc.side_effect = run_with_config bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id) - bq_hook.run_query(bql='select 1') + bq_hook.run_query(sql='select 1') mocked_rwc.assert_called_once() @@ -344,7 +365,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase): bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id) bq_hook.run_query( - bql='select 1', + sql='select 1', destination_dataset_table='my_dataset.my_table', time_partitioning={'type': 'DAY', 'field': 'test_field', 'expirationMs': 1000} ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/tests/contrib/operators/test_bigquery_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 65bd5a5..6a51d0c 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,10 +18,12 @@ # under the License. import unittest +import warnings -from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator -from airflow.contrib.operators.bigquery_operator \ - import BigQueryCreateExternalTableOperator +from airflow.contrib.operators.bigquery_operator import \ + BigQueryCreateExternalTableOperator, \ + BigQueryOperator, \ + BigQueryCreateEmptyTableOperator try: from unittest import mock @@ -40,6 +42,18 @@ TEST_GCS_DATA = ['dir1/*.csv'] TEST_SOURCE_FORMAT = 'CSV' +class BigQueryOperatorTest(unittest.TestCase): + def test_bql_deprecation_warning(self): + with warnings.catch_warnings(record=True) as w: + BigQueryOperator( + task_id='test_deprecation_warning_for_bql', + bql='select * from test_table' + ) + self.assertIn( + 'Deprecated parameter `bql`', + w[0].message.args[0]) + + class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase): @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
