Repository: incubator-airflow
Updated Branches:
  refs/heads/master ec65ef7e2 -> 7220e72e3


[AIRFLOW-718] Allow the query URI for DataProc Pig

The query URI parameter was missing for the
DataProc pig operator. With
the addition of the parameter you can now store
the pig script in Cloud
Storage.

Closes #1960 from alexvanboxel/feature/dataproc-
pig-uri


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7220e72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7220e72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7220e72e

Branch: refs/heads/master
Commit: 7220e72e373e53e8b0e47f67e19c264b1ba1d6a4
Parents: ec65ef7
Author: Alex Van Boxel <[email protected]>
Authored: Thu Dec 29 22:43:32 2016 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Thu Dec 29 22:43:32 2016 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataproc_hook.py     |  3 +++
 airflow/contrib/operators/dataproc_operator.py | 11 +++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7220e72e/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py 
b/airflow/contrib/hooks/gcp_dataproc_hook.py
index e77c951..c1d8993 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -99,6 +99,9 @@ class _DataProcJobBuilder:
     def add_query(self, query):
         self.job["job"][self.job_type]["queryList"] = {'queries': [query]}
 
+    def add_query_uri(self, query_uri):
+        self.job["job"][self.job_type]["queryFileUri"] = query_uri
+
     def add_jar_file_uris(self, jars):
         if jars is not None:
             self.job["job"][self.job_type]["jarFileUris"] = jars

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7220e72e/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 2955d26..a3df381 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -54,7 +54,8 @@ class DataProcPigOperator(BaseOperator):
     @apply_defaults
     def __init__(
             self,
-            query,
+            query=None,
+            query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
@@ -73,6 +74,8 @@ class DataProcPigOperator(BaseOperator):
 
         :param query: The query or reference to the query file (pg or pig 
extension).
         :type query: string
+        :param query_uri: The uri of a pig script on Cloud Storage.
+        :type query_uri: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
         :param job_name: The job name used in the DataProc cluster. This name 
by default
@@ -98,6 +101,7 @@ class DataProcPigOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.query = query
+        self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
@@ -110,7 +114,10 @@ class DataProcPigOperator(BaseOperator):
         job = hook.create_job_template(self.task_id, self.dataproc_cluster, 
"pigJob",
                                        self.dataproc_properties)
 
-        job.add_query(self.query)
+        if self.query is None:
+            job.add_query_uri(self.query_uri)
+        else:
+            job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)

Reply via email to