Repository: incubator-airflow Updated Branches: refs/heads/master ec65ef7e2 -> 7220e72e3
[AIRFLOW-718] Allow the query URI for DataProc Pig The query URI parameter was missing for the DataProc pig operator. With the addition of the parameter you can now store the pig script in Cloud Storage. Closes #1960 from alexvanboxel/feature/dataproc- pig-uri Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7220e72e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7220e72e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7220e72e Branch: refs/heads/master Commit: 7220e72e373e53e8b0e47f67e19c264b1ba1d6a4 Parents: ec65ef7 Author: Alex Van Boxel <[email protected]> Authored: Thu Dec 29 22:43:32 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Dec 29 22:43:32 2016 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataproc_hook.py | 3 +++ airflow/contrib/operators/dataproc_operator.py | 11 +++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7220e72e/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index e77c951..c1d8993 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -99,6 +99,9 @@ class _DataProcJobBuilder: def add_query(self, query): self.job["job"][self.job_type]["queryList"] = {'queries': [query]} + def add_query_uri(self, query_uri): + self.job["job"][self.job_type]["queryFileUri"] = query_uri + def add_jar_file_uris(self, jars): if jars is not None: self.job["job"][self.job_type]["jarFileUris"] = jars http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7220e72e/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 2955d26..a3df381 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -54,7 +54,8 @@ class DataProcPigOperator(BaseOperator): @apply_defaults def __init__( self, - query, + query=None, + query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', @@ -73,6 +74,8 @@ class DataProcPigOperator(BaseOperator): :param query: The query or reference to the query file (pg or pig extension). :type query: string + :param query_uri: The uri of a pig script on Cloud Storage. + :type query_uri: string :param variables: Map of named parameters for the query. :type variables: dict :param job_name: The job name used in the DataProc cluster. This name by default @@ -98,6 +101,7 @@ class DataProcPigOperator(BaseOperator): self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.query = query + self.query_uri = query_uri self.variables = variables self.job_name = job_name self.dataproc_cluster = dataproc_cluster @@ -110,7 +114,10 @@ class DataProcPigOperator(BaseOperator): job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob", self.dataproc_properties) - job.add_query(self.query) + if self.query is None: + job.add_query_uri(self.query_uri) + else: + job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name)
