Repository: incubator-airflow Updated Branches: refs/heads/master 5fe25d859 -> d32c72969
[AIRFLOW-1350] Add query_uri param to Hive/SparkSQL DataProc operator Closes #2402 from lukeFalsina/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d32c7296 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d32c7296 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d32c7296 Branch: refs/heads/master Commit: d32c7296908e6975c4dda7159c1a7a6b9e89f046 Parents: 5fe25d8 Author: Luca Falsina <[email protected]> Authored: Tue Jun 27 12:43:13 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Jun 27 12:43:17 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 22 +++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d32c7296/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 14245c8..3e006ac 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -472,7 +472,8 @@ class DataProcHiveOperator(BaseOperator): @apply_defaults def __init__( self, - query, + query=None, + query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', @@ -487,6 +488,8 @@ class DataProcHiveOperator(BaseOperator): :param query: The query or reference to the query file (q extension). :type query: string + :param query_uri: The uri of a hive script on Cloud Storage. + :type query_uri: string :param variables: Map of named parameters for the query. :type variables: dict :param job_name: The job name used in the DataProc cluster. This name by default @@ -512,6 +515,7 @@ class DataProcHiveOperator(BaseOperator): self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.query = query + self.query_uri = query_uri self.variables = variables self.job_name = job_name self.dataproc_cluster = dataproc_cluster @@ -525,7 +529,10 @@ class DataProcHiveOperator(BaseOperator): job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob", self.dataproc_properties) - job.add_query(self.query) + if self.query is None: + job.add_query_uri(self.query_uri) + else: + job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) @@ -544,7 +551,8 @@ class DataProcSparkSqlOperator(BaseOperator): @apply_defaults def __init__( self, - query, + query=None, + query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', @@ -559,6 +567,8 @@ class DataProcSparkSqlOperator(BaseOperator): :param query: The query or reference to the query file (q extension). :type query: string + :param query_uri: The uri of a spark sql script on Cloud Storage. + :type query_uri: string :param variables: Map of named parameters for the query. :type variables: dict :param job_name: The job name used in the DataProc cluster. This name by default @@ -584,6 +594,7 @@ class DataProcSparkSqlOperator(BaseOperator): self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.query = query + self.query_uri = query_uri self.variables = variables self.job_name = job_name self.dataproc_cluster = dataproc_cluster @@ -597,7 +608,10 @@ class DataProcSparkSqlOperator(BaseOperator): job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob", self.dataproc_properties) - job.add_query(self.query) + if self.query is None: + job.add_query_uri(self.query_uri) + else: + job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name)
