Repository: incubator-airflow
Updated Branches:
  refs/heads/master 69334fc44 -> a289497c8


[AIRFLOW-1404] Add 'flatten_results' & 'maximum_bytes_billed' to BQ Operator

- Updated BQ hook `run_query` method to add
'flatten_results' & 'maximum_bytes_billed'
parameters
- Added the same in BQ Operator

Closes #3030 from kaxil/AIRFLOW-1404


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a289497c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a289497c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a289497c

Branch: refs/heads/master
Commit: a289497c807c238deb34bac54bbc4b28cbda9440
Parents: 69334fc
Author: Kaxil Naik <[email protected]>
Authored: Sun Feb 11 21:18:28 2018 +0100
Committer: Fokko Driesprong <[email protected]>
Committed: Sun Feb 11 21:18:28 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 15 +++++++++++++++
 airflow/contrib/operators/bigquery_operator.py | 20 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a289497c/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 220156b..ce7e2c3 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -465,8 +465,10 @@ class BigQueryBaseCursor(LoggingMixin):
                   destination_dataset_table=False,
                   write_disposition='WRITE_EMPTY',
                   allow_large_results=False,
+                  flatten_results=False,
                   udf_config=False,
                   maximum_billing_tier=None,
+                  maximum_bytes_billed=None,
                   create_disposition='CREATE_IF_NEEDED',
                   query_params=None,
                   schema_update_options=(),
@@ -488,12 +490,22 @@ class BigQueryBaseCursor(LoggingMixin):
         :type write_disposition: string
         :param allow_large_results: Whether to allow large results.
         :type allow_large_results: boolean
+        :param flatten_results: If true and query uses legacy SQL dialect, 
flattens
+            all nested and repeated fields in the query results. 
``allowLargeResults``
+            must be true if this is set to false. For standard SQL queries, 
this
+            flag is ignored and results are never flattened.
+        :type flatten_results: boolean
         :param udf_config: The User Defined Function configuration for the 
query.
             See https://cloud.google.com/bigquery/user-defined-functions for 
details.
         :type udf_config: list
         :param maximum_billing_tier: Positive integer that serves as a
             multiplier of the basic price.
         :type maximum_billing_tier: integer
+        :param maximum_bytes_billed: Limits the bytes billed for this job.
+            Queries that will have bytes billed beyond this limit will fail
+            (without incurring a charge). If unspecified, this will be
+            set to your project default.
+        :type maximum_bytes_billed: float
         :param create_disposition: Specifies whether the job is allowed to
             create new tables.
         :type create_disposition: string
@@ -528,6 +540,7 @@ class BigQueryBaseCursor(LoggingMixin):
                 'query': bql,
                 'useLegacySql': self.use_legacy_sql,
                 'maximumBillingTier': maximum_billing_tier,
+                'maximumBytesBilled': maximum_bytes_billed,
                 'priority': priority
             }
         }
@@ -542,6 +555,8 @@ class BigQueryBaseCursor(LoggingMixin):
             configuration['query'].update({
                 'allowLargeResults':
                 allow_large_results,
+                'flattenResults':
+                flatten_results,
                 'writeDisposition':
                 write_disposition,
                 'createDisposition':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a289497c/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index d43a65c..ef1ada0 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -38,6 +38,13 @@ class BigQueryOperator(BaseOperator):
     :param create_disposition: Specifies whether the job is allowed to create 
new tables.
         (default: 'CREATE_IF_NEEDED')
     :type create_disposition: string
+    :param allow_large_results: Whether to allow large results.
+    :type allow_large_results: boolean
+    :param flatten_results: If true and query uses legacy SQL dialect, flattens
+        all nested and repeated fields in the query results. 
``allow_large_results``
+        must be ``true`` if this is set to ``false``. For standard SQL 
queries, this
+        flag is ignored and results are never flattened.
+    :type flatten_results: boolean
     :param bigquery_conn_id: reference to a specific BigQuery hook.
     :type bigquery_conn_id: string
     :param delegate_to: The account to impersonate, if any.
@@ -53,7 +60,12 @@ class BigQueryOperator(BaseOperator):
         of the basic price.
         Defaults to None, in which case it uses the value set in the project.
     :type maximum_billing_tier: integer
-    :param schema_update_options: Allows the schema of the desitination
+    :param maximum_bytes_billed: Limits the bytes billed for this job.
+        Queries that will have bytes billed beyond this limit will fail
+        (without incurring a charge). If unspecified, this will be
+        set to your project default.
+    :type maximum_bytes_billed: float
+    :param schema_update_options: Allows the schema of the destination
         table to be updated as a side effect of the load job.
     :type schema_update_options: tuple
     :param query_params: a dictionary containing query parameter types and
@@ -71,11 +83,13 @@ class BigQueryOperator(BaseOperator):
                  destination_dataset_table=False,
                  write_disposition='WRITE_EMPTY',
                  allow_large_results=False,
+                 flatten_results=False,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
                  udf_config=False,
                  use_legacy_sql=True,
                  maximum_billing_tier=None,
+                 maximum_bytes_billed=None,
                  create_disposition='CREATE_IF_NEEDED',
                  schema_update_options=(),
                  query_params=None,
@@ -88,11 +102,13 @@ class BigQueryOperator(BaseOperator):
         self.write_disposition = write_disposition
         self.create_disposition = create_disposition
         self.allow_large_results = allow_large_results
+        self.flatten_results = flatten_results
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
         self.udf_config = udf_config
         self.use_legacy_sql = use_legacy_sql
         self.maximum_billing_tier = maximum_billing_tier
+        self.maximum_bytes_billed = maximum_bytes_billed
         self.schema_update_options = schema_update_options
         self.query_params = query_params
         self.bq_cursor = None
@@ -112,8 +128,10 @@ class BigQueryOperator(BaseOperator):
             destination_dataset_table=self.destination_dataset_table,
             write_disposition=self.write_disposition,
             allow_large_results=self.allow_large_results,
+            flatten_results=self.flatten_results,
             udf_config=self.udf_config,
             maximum_billing_tier=self.maximum_billing_tier,
+            maximum_bytes_billed=self.maximum_bytes_billed,
             create_disposition=self.create_disposition,
             query_params=self.query_params,
             schema_update_options=self.schema_update_options,

Reply via email to