Repository: incubator-airflow Updated Branches: refs/heads/master 5157b5a76 -> d04519e60
[AIRFLOW-1816] Add region param to Dataproc operators Closes #2788 from DanSedov/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d04519e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d04519e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d04519e6 Branch: refs/heads/master Commit: d04519e6051e39ec95c553c0f550092cfa418a38 Parents: 5157b5a Author: Dan Sedov <[email protected]> Authored: Wed Nov 15 12:05:00 2017 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Nov 15 12:05:06 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d04519e6/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index ba2c601..9c1eb0f 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -441,6 +441,7 @@ class DataProcPigOperator(BaseOperator): dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, + region='global', *args, **kwargs): """ @@ -474,6 +475,8 @@ class DataProcPigOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ super(DataProcPigOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id @@ -485,6 +488,7 @@ class DataProcPigOperator(BaseOperator): self.cluster_name = cluster_name self.dataproc_properties = dataproc_pig_properties self.dataproc_jars = dataproc_pig_jars + self.region = region def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, @@ -500,7 +504,7 @@ class DataProcPigOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build()) + hook.submit(hook.project_id, job.build(), self.region) class DataProcHiveOperator(BaseOperator): @@ -606,6 +610,7 @@ class DataProcSparkSqlOperator(BaseOperator): dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, + region='global', *args, **kwargs): """ @@ -635,6 +640,8 @@ class DataProcSparkSqlOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id @@ -646,6 +653,7 @@ class DataProcSparkSqlOperator(BaseOperator): self.cluster_name = cluster_name self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars + self.region = region def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, @@ -662,7 +670,7 @@ class DataProcSparkSqlOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.set_job_name(self.job_name) - hook.submit(hook.project_id, job.build()) + hook.submit(hook.project_id, job.build(), self.region) class DataProcSparkOperator(BaseOperator):
