Repository: incubator-airflow Updated Branches: refs/heads/master 6372770be -> eb012a3c8
[AIRFLOW-1728] Add networkUri, subnet, tags to Dataproc operator Closes #2706 from jfantom/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb012a3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb012a3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb012a3c Branch: refs/heads/master Commit: eb012a3c8daecdbe20c13958468fbd21ffbbbe3e Parents: 6372770 Author: jfantom <[email protected]> Authored: Fri Oct 20 10:32:08 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Fri Oct 20 10:32:13 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 20 ++++++++++++++++++++ .../contrib/operators/test_dataproc_operator.py | 15 +++++++++++++++ 2 files changed, 35 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 99e4a0d..ba2c601 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -43,6 +43,9 @@ class DataprocClusterCreateOperator(BaseOperator): project_id, num_workers, zone, + network_uri=None, + subnetwork_uri=None, + tags=None, storage_bucket=None, init_actions_uris=None, metadata=None, @@ -105,6 +108,14 @@ class DataprocClusterCreateOperator(BaseOperator): :type labels: dict :param zone: The zone where the cluster will be located :type zone: string + :param network_uri: The network uri to be used for machine communication, cannot be + specified with subnetwork_uri + :type network_uri: string + :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be + specified with network_uri + :type subnetwork_uri: string + :param tags: The GCE tags to add to all instances + :type tags: list[string] :param region: leave as 'global', might become relevant in the future :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. :type gcp_conn_id: string @@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator): self.worker_disk_size = worker_disk_size self.labels = labels self.zone = zone + self.network_uri = network_uri + self.subnetwork_uri = subnetwork_uri + self.tags = tags self.region = region self.service_account = service_account self.service_account_scopes = service_account_scopes @@ -246,6 +260,12 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['configBucket'] = self.storage_bucket if self.metadata: cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata + if self.network_uri: + cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri + if self.subnetwork_uri: + cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri + if self.tags: + cluster_data['config']['gceClusterConfig']['tags'] = self.tags if self.image_version: cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version if self.properties: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index ad78a8d..2df056a 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -44,6 +44,9 @@ CLUSTER_NAME = 'test-cluster-name' PROJECT_ID = 'test-project-id' NUM_WORKERS = 123 ZONE = 'us-central1-a' +NETWORK_URI = '/projects/project_id/regions/global/net' +SUBNETWORK_URI = '/projects/project_id/regions/global/subnet' +TAGS = ['tag1', 'tag2'] STORAGE_BUCKET = 'gs://airflow-test-bucket/' IMAGE_VERSION = '1.1' MASTER_MACHINE_TYPE = 'n1-standard-2' @@ -76,6 +79,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): project_id=PROJECT_ID, num_workers=NUM_WORKERS, zone=ZONE, + network_uri=NETWORK_URI, + subnetwork_uri=SUBNETWORK_URI, + tags=TAGS, storage_bucket=STORAGE_BUCKET, image_version=IMAGE_VERSION, master_machine_type=MASTER_MACHINE_TYPE, @@ -103,6 +109,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): self.assertEqual(dataproc_operator.project_id, PROJECT_ID) self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS) self.assertEqual(dataproc_operator.zone, ZONE) + self.assertEqual(dataproc_operator.network_uri, NETWORK_URI) + self.assertEqual(dataproc_operator.subnetwork_uri, SUBNETWORK_URI) + self.assertEqual(dataproc_operator.tags, TAGS) self.assertEqual(dataproc_operator.storage_bucket, STORAGE_BUCKET) self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION) self.assertEqual(dataproc_operator.master_machine_type, MASTER_MACHINE_TYPE) @@ -125,6 +134,12 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): NUM_PREEMPTIBLE_WORKERS) self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'], SERVICE_ACCOUNT_SCOPES) + self.assertEqual(cluster_data['config']['gceClusterConfig']['subnetworkUri'], + SUBNETWORK_URI) + self.assertEqual(cluster_data['config']['gceClusterConfig']['networkUri'], + NETWORK_URI) + self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'], + TAGS) # test whether the default airflow-version label has been properly # set to the dataproc operator. merged_labels = {}
