Repository: incubator-airflow Updated Branches: refs/heads/master 5c8075526 -> b1f902e63
[AIRFLOW-1535] Add service account/scopes in dataproc Closes #2546 from fenglu-g/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b1f902e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b1f902e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b1f902e6 Branch: refs/heads/master Commit: b1f902e63e93e8e3c275c501a13987d658dd268f Parents: 5c80755 Author: fenglu-g <[email protected]> Authored: Wed Aug 30 09:13:43 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Aug 30 09:13:43 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 14 ++++++++++++++ tests/contrib/operators/test_dataproc_operator.py | 10 +++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1f902e6/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index aa9b335..c0ff6a7 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -58,6 +58,8 @@ class DataprocClusterCreateOperator(BaseOperator): region='global', google_cloud_conn_id='google_cloud_default', delegate_to=None, + service_account=None, + service_account_scopes=None, *args, **kwargs): """ @@ -111,6 +113,10 @@ class DataprocClusterCreateOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string + :param service_account: The service account of the dataproc instances. + :type service_account: string + :param service_account_scopes: The URIs of service account scopes to be included. + :type service_account_scopes: list[string] """ super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs) self.google_cloud_conn_id = google_cloud_conn_id @@ -131,6 +137,8 @@ class DataprocClusterCreateOperator(BaseOperator): self.labels = labels self.zone = zone self.region = region + self.service_account = service_account + self.service_account_scopes = service_account_scopes def _get_cluster_list_for_project(self, service): result = service.projects().regions().clusters().list( @@ -247,6 +255,12 @@ class DataprocClusterCreateOperator(BaseOperator): {'executableFile': uri} for uri in self.init_actions_uris ] cluster_data['config']['initializationActions'] = init_actions_dict + if self.service_account: + cluster_data['config']['gceClusterConfig']['serviceAccount'] =\ + self.service_account + if self.service_account_scopes: + cluster_data['config']['gceClusterConfig']['serviceAccountScopes'] =\ + self.service_account_scopes return cluster_data def execute(self, context): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1f902e6/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index a0c6ba0..71edf58 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -41,6 +41,10 @@ WORKER_DISK_SIZE = 100 NUM_PREEMPTIBLE_WORKERS = 2 LABEL1 = {} LABEL2 = {'application':'test', 'year': 2017} +SERVICE_ACCOUNT_SCOPES = [ + 'https://www.googleapis.com/auth/bigquery', + 'https://www.googleapis.com/auth/bigtable.data' +] DEFAULT_DATE = datetime.datetime(2017, 6, 6) class DataprocClusterCreateOperatorTest(unittest.TestCase): @@ -64,7 +68,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): worker_machine_type=WORKER_MACHINE_TYPE, worker_disk_size=WORKER_DISK_SIZE, num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS, - labels = deepcopy(labels) + labels = deepcopy(labels), + service_account_scopes = SERVICE_ACCOUNT_SCOPES ) ) self.dag = DAG( @@ -91,6 +96,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE) self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS) self.assertEqual(dataproc_operator.labels, self.labels[suffix]) + self.assertEqual(dataproc_operator.service_account_scopes, SERVICE_ACCOUNT_SCOPES) def test_build_cluster_data(self): for suffix, dataproc_operator in enumerate(self.dataproc_operators): @@ -102,6 +108,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS) self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'], NUM_PREEMPTIBLE_WORKERS) + self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'], + SERVICE_ACCOUNT_SCOPES) # test whether the default airflow-version label has been properly # set to the dataproc operator. merged_labels = {}
