Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0bc6042ce -> b8487cd44


[AIRFLOW-2560] Adding support for internalIpOnly to 
DataprocClusterCreateOperator

Closes #3458 from piffall/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b8487cd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b8487cd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b8487cd4

Branch: refs/heads/master
Commit: b8487cd4468fba41f6468c936487ffc2203b4c2e
Parents: 0bc6042
Author: Cristòfol Torrens <tofol.torr...@bluekiri.com>
Authored: Wed Jun 6 13:06:03 2018 +0100
Committer: Kaxil Naik <kaxiln...@apache.org>
Committed: Wed Jun 6 13:06:03 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 13 ++++-
 .../contrib/operators/test_dataproc_operator.py | 51 ++++++++++++++++----
 2 files changed, 54 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8487cd4/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 4973eb1..5d59f7f 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -93,6 +93,10 @@ class DataprocClusterCreateOperator(BaseOperator):
     :param subnetwork_uri: The subnetwork uri to be used for machine 
communication,
         cannot be specified with network_uri
     :type subnetwork_uri: string
+    :param internal_ip_only: If true, all instances in the cluster will only
+        have internal IP addresses. This can only be enabled for subnetwork
+        enabled networks
+    :type internal_ip_only: bool
     :param tags: The GCE tags to add to all instances
     :type tags: list[string]
     :param region: leave as 'global', might become relevant in the future. 
(templated)
@@ -111,7 +115,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         A duration in seconds.
     :type idle_delete_ttl: int
     :param auto_delete_time:  The time when cluster will be auto-deleted.
-    :type auto_delete_time: datetime
+    :type auto_delete_time: datetime.datetime
     :param auto_delete_ttl: The life duration of cluster, the cluster will be
         auto-deleted at the end of this duration.
         A duration in seconds. (If auto_delete_time is set this parameter will 
be ignored)
@@ -128,6 +132,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                  zone,
                  network_uri=None,
                  subnetwork_uri=None,
+                 internal_ip_only=None,
                  tags=None,
                  storage_bucket=None,
                  init_actions_uris=None,
@@ -173,6 +178,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.zone = zone
         self.network_uri = network_uri
         self.subnetwork_uri = subnetwork_uri
+        self.internal_ip_only = internal_ip_only
         self.tags = tags
         self.region = region
         self.service_account = service_account
@@ -306,6 +312,11 @@ class DataprocClusterCreateOperator(BaseOperator):
         if self.subnetwork_uri:
             cluster_data['config']['gceClusterConfig']['subnetworkUri'] = \
                 self.subnetwork_uri
+        if self.internal_ip_only:
+            if not self.subnetwork_uri:
+                raise AirflowException("Set internal_ip_only to true only when"
+                                       " you pass a subnetwork_uri.")
+            cluster_data['config']['gceClusterConfig']['internalIpOnly'] = True
         if self.tags:
             cluster_data['config']['gceClusterConfig']['tags'] = self.tags
         if self.image_version:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8487cd4/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py 
b/tests/contrib/operators/test_dataproc_operator.py
index 6cb2044..65ff5cd 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -22,7 +22,7 @@ import datetime
 import re
 import unittest
 
-from airflow import DAG
+from airflow import DAG, AirflowException
 from airflow.contrib.operators.dataproc_operator import \
     DataprocClusterCreateOperator, \
     DataprocClusterDeleteOperator, \
@@ -55,6 +55,7 @@ NUM_WORKERS = 123
 ZONE = 'us-central1-a'
 NETWORK_URI = '/projects/project_id/regions/global/net'
 SUBNETWORK_URI = '/projects/project_id/regions/global/subnet'
+INTERNAL_IP_ONLY = True
 TAGS = ['tag1', 'tag2']
 STORAGE_BUCKET = 'gs://airflow-test-bucket/'
 IMAGE_VERSION = '1.1'
@@ -98,6 +99,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     zone=ZONE,
                     network_uri=NETWORK_URI,
                     subnetwork_uri=SUBNETWORK_URI,
+                    internal_ip_only=INTERNAL_IP_ONLY,
                     tags=TAGS,
                     storage_bucket=STORAGE_BUCKET,
                     image_version=IMAGE_VERSION,
@@ -157,19 +159,25 @@ class 
DataprocClusterCreateOperatorTest(unittest.TestCase):
             cluster_data = dataproc_operator._build_cluster_data()
             self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME)
             self.assertEqual(cluster_data['projectId'], PROJECT_ID)
-            self.assertEqual(cluster_data['config']['softwareConfig'], 
{'imageVersion': IMAGE_VERSION})
+            self.assertEqual(cluster_data['config']['softwareConfig'],
+                             {'imageVersion': IMAGE_VERSION})
             self.assertEqual(cluster_data['config']['configBucket'], 
STORAGE_BUCKET)
-            
self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], 
NUM_WORKERS)
-            
self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'],
-                             NUM_PREEMPTIBLE_WORKERS)
-            
self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'],
+            
self.assertEqual(cluster_data['config']['workerConfig']['numInstances'],
+                             NUM_WORKERS)
+            self.assertEqual(
+                
cluster_data['config']['secondaryWorkerConfig']['numInstances'],
+                NUM_PREEMPTIBLE_WORKERS)
+            self.assertEqual(
+                
cluster_data['config']['gceClusterConfig']['serviceAccountScopes'],
                 SERVICE_ACCOUNT_SCOPES)
+            
self.assertEqual(cluster_data['config']['gceClusterConfig']['internalIpOnly'],
+                             INTERNAL_IP_ONLY)
             
self.assertEqual(cluster_data['config']['gceClusterConfig']['subnetworkUri'],
-                SUBNETWORK_URI)
+                             SUBNETWORK_URI)
             
self.assertEqual(cluster_data['config']['gceClusterConfig']['networkUri'],
-                NETWORK_URI)
+                             NETWORK_URI)
             
self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
-                TAGS)
+                             TAGS)
             
self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'],
                              "321s")
             
self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'],
@@ -270,6 +278,31 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 mock_info.assert_called_with('Creating cluster: %s',
                                              u'smoke-cluster-testnodash')
 
+    def test_build_cluster_data_internal_ip_only_without_subnetwork(self):
+
+        def create_cluster_with_invalid_internal_ip_only_setup():
+
+            # Given
+            create_cluster = DataprocClusterCreateOperator(
+                task_id=TASK_ID,
+                cluster_name=CLUSTER_NAME,
+                project_id=PROJECT_ID,
+                num_workers=NUM_WORKERS,
+                zone=ZONE,
+                dag=self.dag,
+                internal_ip_only=True)
+
+            # When
+            create_cluster._build_cluster_data()
+
+        # Then
+        with self.assertRaises(AirflowException) as cm:
+            create_cluster_with_invalid_internal_ip_only_setup()
+
+        self.assertEqual(str(cm.exception),
+                         "Set internal_ip_only to true only when"
+                         " you pass a subnetwork_uri.")
+
 
 class DataprocClusterScaleOperatorTest(unittest.TestCase):
     # Unit test for the DataprocClusterScaleOperator

Reply via email to