Repository: incubator-airflow
Updated Branches:
  refs/heads/master fd4360b9f -> 3f50f6bd5


[AIRFLOW-2039] BigQueryOperator supports priority property

Closes #2980 from yu-iskw/modify-bqoperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3f50f6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3f50f6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3f50f6bd

Branch: refs/heads/master
Commit: 3f50f6bd5b4dc51c8d838253ed0d62b4561584a6
Parents: fd4360b
Author: Yu ISHIKAWA <yuu.ishik...@gmail.com>
Authored: Fri Feb 2 09:33:54 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Feb 2 09:33:54 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 10 ++++++++--
 airflow/contrib/operators/bigquery_operator.py |  5 ++++-
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f50f6bd/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 120cee0..e0dea46 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -404,7 +404,8 @@ class BigQueryBaseCursor(LoggingMixin):
                   maximum_billing_tier=None,
                   create_disposition='CREATE_IF_NEEDED',
                   query_params=None,
-                  schema_update_options=()):
+                  schema_update_options=(),
+                  priority='INTERACTIVE'):
         """
         Executes a BigQuery SQL query. Optionally persists results in a 
BigQuery
         table. See here:
@@ -437,6 +438,10 @@ class BigQueryBaseCursor(LoggingMixin):
         :param schema_update_options: Allows the schema of the desitination
             table to be updated as a side effect of the query job.
         :type schema_update_options: tuple
+        :param priority: Specifies a priority for the query.
+            Possible values include INTERACTIVE and BATCH.
+            The default value is INTERACTIVE.
+        :type priority: string
         """
 
         # BigQuery also allows you to define how you want a table's schema to 
change
@@ -457,7 +462,8 @@ class BigQueryBaseCursor(LoggingMixin):
             'query': {
                 'query': bql,
                 'useLegacySql': self.use_legacy_sql,
-                'maximumBillingTier': maximum_billing_tier
+                'maximumBillingTier': maximum_billing_tier,
+                'priority': priority
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3f50f6bd/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py 
b/airflow/contrib/operators/bigquery_operator.py
index 94fa9b7..e24315d 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -76,6 +76,7 @@ class BigQueryOperator(BaseOperator):
                  create_disposition='CREATE_IF_NEEDED',
                  schema_update_options=(),
                  query_params=None,
+                 priority='INTERACTIVE',
                  *args,
                  **kwargs):
         super(BigQueryOperator, self).__init__(*args, **kwargs)
@@ -92,6 +93,7 @@ class BigQueryOperator(BaseOperator):
         self.schema_update_options = schema_update_options
         self.query_params = query_params
         self.bq_cursor = None
+        self.priority = priority
 
     def execute(self, context):
         if self.bq_cursor is None:
@@ -111,7 +113,8 @@ class BigQueryOperator(BaseOperator):
             maximum_billing_tier=self.maximum_billing_tier,
             create_disposition=self.create_disposition,
             query_params=self.query_params,
-            schema_update_options=self.schema_update_options)
+            schema_update_options=self.schema_update_options,
+            priority=self.priority)
 
     def on_kill(self):
         super(BigQueryOperator, self).on_kill()

Reply via email to