Repository: incubator-airflow Updated Branches: refs/heads/master 0bc6042ce -> b8487cd44
[AIRFLOW-2560] Adding support for internalIpOnly to DataprocClusterCreateOperator Closes #3458 from piffall/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b8487cd4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b8487cd4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b8487cd4 Branch: refs/heads/master Commit: b8487cd4468fba41f6468c936487ffc2203b4c2e Parents: 0bc6042 Author: Cristòfol Torrens <tofol.torr...@bluekiri.com> Authored: Wed Jun 6 13:06:03 2018 +0100 Committer: Kaxil Naik <kaxiln...@apache.org> Committed: Wed Jun 6 13:06:03 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 13 ++++- .../contrib/operators/test_dataproc_operator.py | 51 ++++++++++++++++---- 2 files changed, 54 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8487cd4/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 4973eb1..5d59f7f 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -93,6 +93,10 @@ class DataprocClusterCreateOperator(BaseOperator): :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be specified with network_uri :type subnetwork_uri: string + :param internal_ip_only: If true, all instances in the cluster will only + have internal IP addresses. This can only be enabled for subnetwork + enabled networks + :type internal_ip_only: bool :param tags: The GCE tags to add to all instances :type tags: list[string] :param region: leave as 'global', might become relevant in the future. (templated) @@ -111,7 +115,7 @@ class DataprocClusterCreateOperator(BaseOperator): A duration in seconds. :type idle_delete_ttl: int :param auto_delete_time: The time when cluster will be auto-deleted. - :type auto_delete_time: datetime + :type auto_delete_time: datetime.datetime :param auto_delete_ttl: The life duration of cluster, the cluster will be auto-deleted at the end of this duration. A duration in seconds. (If auto_delete_time is set this parameter will be ignored) @@ -128,6 +132,7 @@ class DataprocClusterCreateOperator(BaseOperator): zone, network_uri=None, subnetwork_uri=None, + internal_ip_only=None, tags=None, storage_bucket=None, init_actions_uris=None, @@ -173,6 +178,7 @@ class DataprocClusterCreateOperator(BaseOperator): self.zone = zone self.network_uri = network_uri self.subnetwork_uri = subnetwork_uri + self.internal_ip_only = internal_ip_only self.tags = tags self.region = region self.service_account = service_account @@ -306,6 +312,11 @@ class DataprocClusterCreateOperator(BaseOperator): if self.subnetwork_uri: cluster_data['config']['gceClusterConfig']['subnetworkUri'] = \ self.subnetwork_uri + if self.internal_ip_only: + if not self.subnetwork_uri: + raise AirflowException("Set internal_ip_only to true only when" + " you pass a subnetwork_uri.") + cluster_data['config']['gceClusterConfig']['internalIpOnly'] = True if self.tags: cluster_data['config']['gceClusterConfig']['tags'] = self.tags if self.image_version: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8487cd4/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 6cb2044..65ff5cd 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -22,7 +22,7 @@ import datetime import re import unittest -from airflow import DAG +from airflow import DAG, AirflowException from airflow.contrib.operators.dataproc_operator import \ DataprocClusterCreateOperator, \ DataprocClusterDeleteOperator, \ @@ -55,6 +55,7 @@ NUM_WORKERS = 123 ZONE = 'us-central1-a' NETWORK_URI = '/projects/project_id/regions/global/net' SUBNETWORK_URI = '/projects/project_id/regions/global/subnet' +INTERNAL_IP_ONLY = True TAGS = ['tag1', 'tag2'] STORAGE_BUCKET = 'gs://airflow-test-bucket/' IMAGE_VERSION = '1.1' @@ -98,6 +99,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): zone=ZONE, network_uri=NETWORK_URI, subnetwork_uri=SUBNETWORK_URI, + internal_ip_only=INTERNAL_IP_ONLY, tags=TAGS, storage_bucket=STORAGE_BUCKET, image_version=IMAGE_VERSION, @@ -157,19 +159,25 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): cluster_data = dataproc_operator._build_cluster_data() self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME) self.assertEqual(cluster_data['projectId'], PROJECT_ID) - self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION}) + self.assertEqual(cluster_data['config']['softwareConfig'], + {'imageVersion': IMAGE_VERSION}) self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET) - self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS) - self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'], - NUM_PREEMPTIBLE_WORKERS) - self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'], + self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], + NUM_WORKERS) + self.assertEqual( + cluster_data['config']['secondaryWorkerConfig']['numInstances'], + NUM_PREEMPTIBLE_WORKERS) + self.assertEqual( + cluster_data['config']['gceClusterConfig']['serviceAccountScopes'], SERVICE_ACCOUNT_SCOPES) + self.assertEqual(cluster_data['config']['gceClusterConfig']['internalIpOnly'], + INTERNAL_IP_ONLY) self.assertEqual(cluster_data['config']['gceClusterConfig']['subnetworkUri'], - SUBNETWORK_URI) + SUBNETWORK_URI) self.assertEqual(cluster_data['config']['gceClusterConfig']['networkUri'], - NETWORK_URI) + NETWORK_URI) self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'], - TAGS) + TAGS) self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'], "321s") self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'], @@ -270,6 +278,31 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): mock_info.assert_called_with('Creating cluster: %s', u'smoke-cluster-testnodash') + def test_build_cluster_data_internal_ip_only_without_subnetwork(self): + + def create_cluster_with_invalid_internal_ip_only_setup(): + + # Given + create_cluster = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + zone=ZONE, + dag=self.dag, + internal_ip_only=True) + + # When + create_cluster._build_cluster_data() + + # Then + with self.assertRaises(AirflowException) as cm: + create_cluster_with_invalid_internal_ip_only_setup() + + self.assertEqual(str(cm.exception), + "Set internal_ip_only to true only when" + " you pass a subnetwork_uri.") + class DataprocClusterScaleOperatorTest(unittest.TestCase): # Unit test for the DataprocClusterScaleOperator