Repository: incubator-airflow Updated Branches: refs/heads/master 69334fc44 -> a289497c8
[AIRFLOW-1404] Add 'flatten_results' & 'maximum_bytes_billed' to BQ Operator - Updated BQ hook `run_query` method to add 'flatten_results' & 'maximum_bytes_billed' parameters - Added the same in BQ Operator Closes #3030 from kaxil/AIRFLOW-1404 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a289497c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a289497c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a289497c Branch: refs/heads/master Commit: a289497c807c238deb34bac54bbc4b28cbda9440 Parents: 69334fc Author: Kaxil Naik <[email protected]> Authored: Sun Feb 11 21:18:28 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Sun Feb 11 21:18:28 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 15 +++++++++++++++ airflow/contrib/operators/bigquery_operator.py | 20 +++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a289497c/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 220156b..ce7e2c3 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -465,8 +465,10 @@ class BigQueryBaseCursor(LoggingMixin): destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, + flatten_results=False, udf_config=False, maximum_billing_tier=None, + maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', query_params=None, schema_update_options=(), @@ -488,12 +490,22 @@ class BigQueryBaseCursor(LoggingMixin): :type write_disposition: string :param allow_large_results: Whether to allow large results. :type allow_large_results: boolean + :param flatten_results: If true and query uses legacy SQL dialect, flattens + all nested and repeated fields in the query results. ``allowLargeResults`` + must be true if this is set to false. For standard SQL queries, this + flag is ignored and results are never flattened. + :type flatten_results: boolean :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. :type udf_config: list :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. :type maximum_billing_tier: integer + :param maximum_bytes_billed: Limits the bytes billed for this job. + Queries that will have bytes billed beyond this limit will fail + (without incurring a charge). If unspecified, this will be + set to your project default. + :type maximum_bytes_billed: float :param create_disposition: Specifies whether the job is allowed to create new tables. :type create_disposition: string @@ -528,6 +540,7 @@ class BigQueryBaseCursor(LoggingMixin): 'query': bql, 'useLegacySql': self.use_legacy_sql, 'maximumBillingTier': maximum_billing_tier, + 'maximumBytesBilled': maximum_bytes_billed, 'priority': priority } } @@ -542,6 +555,8 @@ class BigQueryBaseCursor(LoggingMixin): configuration['query'].update({ 'allowLargeResults': allow_large_results, + 'flattenResults': + flatten_results, 'writeDisposition': write_disposition, 'createDisposition': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a289497c/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index d43a65c..ef1ada0 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -38,6 +38,13 @@ class BigQueryOperator(BaseOperator): :param create_disposition: Specifies whether the job is allowed to create new tables. (default: 'CREATE_IF_NEEDED') :type create_disposition: string + :param allow_large_results: Whether to allow large results. + :type allow_large_results: boolean + :param flatten_results: If true and query uses legacy SQL dialect, flattens + all nested and repeated fields in the query results. ``allow_large_results`` + must be ``true`` if this is set to ``false``. For standard SQL queries, this + flag is ignored and results are never flattened. + :type flatten_results: boolean :param bigquery_conn_id: reference to a specific BigQuery hook. :type bigquery_conn_id: string :param delegate_to: The account to impersonate, if any. @@ -53,7 +60,12 @@ class BigQueryOperator(BaseOperator): of the basic price. Defaults to None, in which case it uses the value set in the project. :type maximum_billing_tier: integer - :param schema_update_options: Allows the schema of the desitination + :param maximum_bytes_billed: Limits the bytes billed for this job. + Queries that will have bytes billed beyond this limit will fail + (without incurring a charge). If unspecified, this will be + set to your project default. + :type maximum_bytes_billed: float + :param schema_update_options: Allows the schema of the destination table to be updated as a side effect of the load job. :type schema_update_options: tuple :param query_params: a dictionary containing query parameter types and @@ -71,11 +83,13 @@ class BigQueryOperator(BaseOperator): destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, + flatten_results=False, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, use_legacy_sql=True, maximum_billing_tier=None, + maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=(), query_params=None, @@ -88,11 +102,13 @@ class BigQueryOperator(BaseOperator): self.write_disposition = write_disposition self.create_disposition = create_disposition self.allow_large_results = allow_large_results + self.flatten_results = flatten_results self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to self.udf_config = udf_config self.use_legacy_sql = use_legacy_sql self.maximum_billing_tier = maximum_billing_tier + self.maximum_bytes_billed = maximum_bytes_billed self.schema_update_options = schema_update_options self.query_params = query_params self.bq_cursor = None @@ -112,8 +128,10 @@ class BigQueryOperator(BaseOperator): destination_dataset_table=self.destination_dataset_table, write_disposition=self.write_disposition, allow_large_results=self.allow_large_results, + flatten_results=self.flatten_results, udf_config=self.udf_config, maximum_billing_tier=self.maximum_billing_tier, + maximum_bytes_billed=self.maximum_bytes_billed, create_disposition=self.create_disposition, query_params=self.query_params, schema_update_options=self.schema_update_options,
