Repository: incubator-airflow
Updated Branches:
  refs/heads/master a47b2776f -> b220fe60d


[AIRFLOW-2513] Change `bql` to `sql` for BigQuery Hooks & Ops

- Change `bql` to `sql` for BigQuery Hooks &
Operators for consistency

Closes #3454 from kaxil/consistent-bq-lang


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b220fe60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b220fe60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b220fe60

Branch: refs/heads/master
Commit: b220fe60d5302546d85ee15cf6c8eaa5859316d3
Parents: a47b277
Author: Kaxil Naik <[email protected]>
Authored: Mon Jun 4 10:04:03 2018 +0100
Committer: Kaxil Naik <[email protected]>
Committed: Mon Jun 4 10:04:03 2018 +0100

----------------------------------------------------------------------
 UPDATING.md                                     |  4 ++
 airflow/contrib/hooks/bigquery_hook.py          | 39 +++++++++++++++-----
 airflow/contrib/operators/bigquery_operator.py  | 33 ++++++++++++++---
 tests/contrib/hooks/test_bigquery_hook.py       | 39 +++++++++++++++-----
 .../contrib/operators/test_bigquery_operator.py | 24 +++++++++---
 5 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index c9e1395..b32b2ac 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -63,6 +63,10 @@ Dataflow job labeling is now supported in 
Dataflow{Java,Python}Operator with a d
 "airflow-version" label, please upgrade your google-cloud-dataflow or 
apache-beam version
 to 2.2.0 or greater.
 
+### BigQuery Hooks and Operator
+The `bql` parameter passed to `BigQueryOperator` and 
`BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for 
consistency purposes. Using `bql` will still work (and raise a 
`DeprecationWarning`), but is no longer
+supported and will be removed entirely in Airflow 2.0
+
 ### Redshift to S3 Operator
 With Airflow 1.9 or lower, Unload operation always included header row. In 
order to include header row,
 we need to turn off parallel unload. It is preferred to perform unload 
operation using all nodes so that it is

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index f4a0a58..bddc2ef 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -82,7 +82,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, 
LoggingMixin):
         """
         raise NotImplementedError()
 
-    def get_pandas_df(self, bql, parameters=None, dialect=None):
+    def get_pandas_df(self, sql, parameters=None, dialect=None):
         """
         Returns a Pandas DataFrame for the results produced by a BigQuery
         query. The DbApiHook method must be overridden because Pandas
@@ -91,8 +91,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, 
LoggingMixin):
         https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447
         https://github.com/pydata/pandas/issues/6900
 
-        :param bql: The BigQuery SQL to execute.
-        :type bql: string
+        :param sql: The BigQuery SQL to execute.
+        :type sql: string
         :param parameters: The parameters to render the SQL query with (not
             used, leave to override superclass method)
         :type parameters: mapping or iterable
@@ -103,7 +103,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, 
LoggingMixin):
         if dialect is None:
             dialect = 'legacy' if self.use_legacy_sql else 'standard'
 
-        return read_gbq(bql,
+        return read_gbq(sql,
                         project_id=self._get_field('project'),
                         dialect=dialect,
                         verbose=False)
@@ -454,7 +454,8 @@ class BigQueryBaseCursor(LoggingMixin):
             )
 
     def run_query(self,
-                  bql,
+                  bql=None,
+                  sql=None,
                   destination_dataset_table=False,
                   write_disposition='WRITE_EMPTY',
                   allow_large_results=False,
@@ -476,8 +477,11 @@ class BigQueryBaseCursor(LoggingMixin):
 
         For more details about these parameters.
 
-        :param bql: The BigQuery SQL to execute.
+        :param bql: (Deprecated. Use `sql` parameter instead) The BigQuery SQL
+            to execute.
         :type bql: string
+        :param sql: The BigQuery SQL to execute.
+        :type sql: string
         :param destination_dataset_table: The dotted <dataset>.<table>
             BigQuery table to save the query results.
         :type destination_dataset_table: string
@@ -526,6 +530,23 @@ class BigQueryBaseCursor(LoggingMixin):
 
         """
 
+        # TODO remove `bql` in Airflow 2.0 - Jira: [AIRFLOW-2513]
+        sql = bql if sql is None else sql
+
+        if bql:
+            import warnings
+            warnings.warn('Deprecated parameter `bql` used in '
+                          '`BigQueryBaseCursor.run_query` '
+                          'Use `sql` parameter instead to pass the sql to be '
+                          'executed. `bql` parameter is deprecated and '
+                          'will be removed in a future version of '
+                          'Airflow.',
+                          category=DeprecationWarning)
+
+        if sql is None:
+            raise TypeError('`BigQueryBaseCursor.run_query` missing 1 required 
'
+                            'positional argument: `sql`')
+
         # BigQuery also allows you to define how you want a table's schema to 
change
         # as a side effect of a query job
         # for more details:
@@ -545,7 +566,7 @@ class BigQueryBaseCursor(LoggingMixin):
 
         configuration = {
             'query': {
-                'query': bql,
+                'query': sql,
                 'useLegacySql': use_legacy_sql,
                 'maximumBillingTier': maximum_billing_tier,
                 'maximumBytesBilled': maximum_bytes_billed,
@@ -1277,9 +1298,9 @@ class BigQueryCursor(BigQueryBaseCursor):
         :param parameters: Parameters to substitute into the query.
         :type parameters: dict
         """
-        bql = _bind_parameters(operation,
+        sql = _bind_parameters(operation,
                                parameters) if parameters else operation
-        self.job_id = self.run_query(bql)
+        self.job_id = self.run_query(sql)
 
     def executemany(self, operation, seq_of_parameters):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index fb35b03..b36efbd 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -29,10 +29,15 @@ class BigQueryOperator(BaseOperator):
     """
     Executes BigQuery SQL queries in a specific BigQuery database
 
-    :param bql: the sql code to be executed
+    :param bql: (Deprecated. Use `sql` parameter instead) the sql code to be
+        executed (templated)
     :type bql: Can receive a str representing a sql statement,
         a list of str (sql statements), or reference to a template file.
-        Template reference are recognized by str ending in '.sql'. (templated)
+        Template reference are recognized by str ending in '.sql'.
+    :param sql: the sql code to be executed (templated)
+    :type sql: Can receive a str representing a sql statement,
+        a list of str (sql statements), or reference to a template file.
+        Template reference are recognized by str ending in '.sql'.
     :param destination_dataset_table: A dotted
         (<project>.|<project>:)<dataset>.<table> that, if set, will store the 
results
         of the query. (templated)
@@ -87,13 +92,14 @@ class BigQueryOperator(BaseOperator):
     :type time_partitioning: dict
     """
 
-    template_fields = ('bql', 'destination_dataset_table')
+    template_fields = ('bql', 'sql', 'destination_dataset_table')
     template_ext = ('.sql', )
     ui_color = '#e4f0e8'
 
     @apply_defaults
     def __init__(self,
-                 bql,
+                 bql=None,
+                 sql=None,
                  destination_dataset_table=False,
                  write_disposition='WRITE_EMPTY',
                  allow_large_results=False,
@@ -113,6 +119,7 @@ class BigQueryOperator(BaseOperator):
                  **kwargs):
         super(BigQueryOperator, self).__init__(*args, **kwargs)
         self.bql = bql
+        self.sql = sql if sql else bql
         self.destination_dataset_table = destination_dataset_table
         self.write_disposition = write_disposition
         self.create_disposition = create_disposition
@@ -130,9 +137,23 @@ class BigQueryOperator(BaseOperator):
         self.priority = priority
         self.time_partitioning = time_partitioning
 
+        # TODO remove `bql` in Airflow 2.0
+        if self.bql:
+            import warnings
+            warnings.warn('Deprecated parameter `bql` used in Task id: {}. '
+                          'Use `sql` parameter instead to pass the sql to be '
+                          'executed. `bql` parameter is deprecated and '
+                          'will be removed in a future version of '
+                          'Airflow.'.format(self.task_id),
+                          category=DeprecationWarning)
+
+        if self.sql is None:
+            raise TypeError('{} missing 1 required positional '
+                            'argument: `sql`'.format(self.task_id))
+
     def execute(self, context):
         if self.bq_cursor is None:
-            self.log.info('Executing: %s', self.bql)
+            self.log.info('Executing: %s', self.sql)
             hook = BigQueryHook(
                 bigquery_conn_id=self.bigquery_conn_id,
                 use_legacy_sql=self.use_legacy_sql,
@@ -140,7 +161,7 @@ class BigQueryOperator(BaseOperator):
             conn = hook.get_conn()
             self.bq_cursor = conn.cursor()
         self.bq_cursor.run_query(
-            self.bql,
+            self.sql,
             destination_dataset_table=self.destination_dataset_table,
             write_disposition=self.write_disposition,
             allow_large_results=self.allow_large_results,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/tests/contrib/hooks/test_bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_bigquery_hook.py 
b/tests/contrib/hooks/test_bigquery_hook.py
index 8841598..31da503 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
 #
 
 import unittest
+import warnings
+
 import mock
 
 from airflow.contrib.hooks import bigquery_hook as hook
@@ -33,6 +35,7 @@ try:
 except HttpAccessTokenRefreshError:
     bq_available = False
 
+
 class TestBigQueryDataframeResults(unittest.TestCase):
     def setUp(self):
         self.instance = hook.BigQueryHook()
@@ -67,6 +70,7 @@ class TestBigQueryDataframeResults(unittest.TestCase):
         self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: 
invalidQuery',
                       str(context.exception), "")
 
+
 class TestBigQueryTableSplitter(unittest.TestCase):
     def test_internal_need_default_project(self):
         with self.assertRaises(Exception) as context:
@@ -104,16 +108,14 @@ class TestBigQueryTableSplitter(unittest.TestCase):
         self.assertEqual("dataset", dataset)
         self.assertEqual("table", table)
 
-
     def test_valid_double_column(self):
         project, dataset, table = 
hook._split_tablename('alt1:alt:dataset.table',
-                                  'project')
+                                                        'project')
 
         self.assertEqual('alt1:alt', project)
         self.assertEqual("dataset", dataset)
         self.assertEqual("table", table)
 
-
     def test_invalid_syntax_triple_colon(self):
         with self.assertRaises(Exception) as context:
             hook._split_tablename('alt1:alt2:alt3:dataset.table',
@@ -123,7 +125,6 @@ class TestBigQueryTableSplitter(unittest.TestCase):
                       str(context.exception), "")
         self.assertFalse('Format exception for' in str(context.exception))
 
-
     def test_invalid_syntax_triple_dot(self):
         with self.assertRaises(Exception) as context:
             hook._split_tablename('alt1.alt.dataset.table',
@@ -213,9 +214,29 @@ class TestBigQueryBaseCursor(unittest.TestCase):
                 "test_schema.json",
                 ["test_data.json"],
                 schema_update_options=["THIS IS NOT VALID"]
-                )
+            )
         self.assertIn("THIS IS NOT VALID", str(context.exception))
 
+    @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
+    def test_bql_deprecation_warning(self, mock_rwc):
+        with warnings.catch_warnings(record=True) as w:
+            hook.BigQueryBaseCursor("test", "test").run_query(
+                bql='select * from test_table'
+            )
+        self.assertIn(
+            'Deprecated parameter `bql`',
+            w[0].message.args[0])
+
+    def test_nobql_nosql_param_error(self):
+        with self.assertRaises(TypeError) as context:
+            hook.BigQueryBaseCursor("test", "test").run_query(
+                sql=None,
+                bql=None
+            )
+        self.assertIn(
+            'missing 1 required positional',
+            str(context.exception))
+
     def test_invalid_schema_update_and_write_disposition(self):
         with self.assertRaises(Exception) as context:
             hook.BigQueryBaseCursor("test", "test").run_load(
@@ -321,7 +342,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase):
         mocked_rwc.side_effect = run_with_config
 
         bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
-        bq_hook.run_query(bql='select 1')
+        bq_hook.run_query(sql='select 1')
 
         mocked_rwc.assert_called_once()
 
@@ -344,7 +365,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase):
 
         bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
         bq_hook.run_query(
-            bql='select 1',
+            sql='select 1',
             destination_dataset_table='my_dataset.my_table',
             time_partitioning={'type': 'DAY', 'field': 'test_field', 
'expirationMs': 1000}
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b220fe60/tests/contrib/operators/test_bigquery_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_bigquery_operator.py 
b/tests/contrib/operators/test_bigquery_operator.py
index 65bd5a5..6a51d0c 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,10 +18,12 @@
 # under the License.
 
 import unittest
+import warnings
 
-from airflow.contrib.operators.bigquery_operator import 
BigQueryCreateEmptyTableOperator
-from airflow.contrib.operators.bigquery_operator \
-    import BigQueryCreateExternalTableOperator
+from airflow.contrib.operators.bigquery_operator import \
+    BigQueryCreateExternalTableOperator, \
+    BigQueryOperator, \
+    BigQueryCreateEmptyTableOperator
 
 try:
     from unittest import mock
@@ -40,6 +42,18 @@ TEST_GCS_DATA = ['dir1/*.csv']
 TEST_SOURCE_FORMAT = 'CSV'
 
 
+class BigQueryOperatorTest(unittest.TestCase):
+    def test_bql_deprecation_warning(self):
+        with warnings.catch_warnings(record=True) as w:
+            BigQueryOperator(
+                task_id='test_deprecation_warning_for_bql',
+                bql='select * from test_table'
+            )
+        self.assertIn(
+            'Deprecated parameter `bql`',
+            w[0].message.args[0])
+
+
 class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
 
     @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')

Reply via email to