Fokko closed pull request #3825: [AIRFLOW-2989] Add param to set bootDiskType 
in Dataproc Op
URL: https://github.com/apache/incubator-airflow/pull/3825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 69073f67de..9b6cae5282 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -75,10 +75,20 @@ class DataprocClusterCreateOperator(BaseOperator):
     :type properties: dict
     :param master_machine_type: Compute engine machine type to use for the 
master node
     :type master_machine_type: string
+    :param master_disk_type: Type of the boot disk for the master node
+        (default is ``pd-standard``).
+        Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+        ``pd-standard`` (Persistent Disk Hard Disk Drive).
+    :type master_disk_type: string
     :param master_disk_size: Disk size for the master node
     :type master_disk_size: int
     :param worker_machine_type: Compute engine machine type to use for the 
worker nodes
     :type worker_machine_type: string
+    :param worker_disk_type: Type of the boot disk for the worker node
+        (default is ``pd-standard``).
+        Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
+        ``pd-standard`` (Persistent Disk Hard Disk Drive).
+    :type worker_disk_type: string
     :param worker_disk_size: Disk size for the worker nodes
     :type worker_disk_size: int
     :param num_preemptible_workers: The # of preemptible worker nodes to spin 
up
@@ -141,8 +151,10 @@ def __init__(self,
                  image_version=None,
                  properties=None,
                  master_machine_type='n1-standard-4',
+                 master_disk_type='pd-standard',
                  master_disk_size=500,
                  worker_machine_type='n1-standard-4',
+                 worker_disk_type='pd-standard',
                  worker_disk_size=500,
                  num_preemptible_workers=0,
                  labels=None,
@@ -171,8 +183,10 @@ def __init__(self,
         self.image_version = image_version
         self.properties = properties
         self.master_machine_type = master_machine_type
+        self.master_disk_type = master_disk_type
         self.master_disk_size = master_disk_size
         self.worker_machine_type = worker_machine_type
+        self.worker_disk_type = worker_disk_type
         self.worker_disk_size = worker_disk_size
         self.labels = labels
         self.zone = zone
@@ -272,6 +286,7 @@ def _build_cluster_data(self):
                     'numInstances': 1,
                     'machineTypeUri': master_type_uri,
                     'diskConfig': {
+                        'bootDiskType': self.master_disk_type,
                         'bootDiskSizeGb': self.master_disk_size
                     }
                 },
@@ -279,6 +294,7 @@ def _build_cluster_data(self):
                     'numInstances': self.num_workers,
                     'machineTypeUri': worker_type_uri,
                     'diskConfig': {
+                        'bootDiskType': self.worker_disk_type,
                         'bootDiskSizeGb': self.worker_disk_size
                     }
                 },
@@ -292,6 +308,7 @@ def _build_cluster_data(self):
                 'numInstances': self.num_preemptible_workers,
                 'machineTypeUri': worker_type_uri,
                 'diskConfig': {
+                    'bootDiskType': self.worker_disk_type,
                     'bootDiskSizeGb': self.worker_disk_size
                 },
                 'isPreemptible': True
@@ -401,7 +418,7 @@ class DataprocClusterScaleOperator(BaseOperator):
             cluster_name='cluster-1',
             num_workers=10,
             num_preemptible_workers=10,
-            graceful_decommission_timeout='1h'
+            graceful_decommission_timeout='1h',
             dag=dag)
 
     .. seealso::
diff --git a/tests/contrib/operators/test_dataproc_operator.py 
b/tests/contrib/operators/test_dataproc_operator.py
index e5cc770321..5b403a86ba 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -61,8 +61,10 @@
 IMAGE_VERSION = '1.1'
 MASTER_MACHINE_TYPE = 'n1-standard-2'
 MASTER_DISK_SIZE = 100
+MASTER_DISK_TYPE = 'pd-standard'
 WORKER_MACHINE_TYPE = 'n1-standard-2'
 WORKER_DISK_SIZE = 100
+WORKER_DISK_TYPE = 'pd-standard'
 NUM_PREEMPTIBLE_WORKERS = 2
 GET_INIT_ACTION_TIMEOUT = "600s"  # 10m
 LABEL1 = {}
@@ -125,8 +127,10 @@ def setUp(self):
                     storage_bucket=STORAGE_BUCKET,
                     image_version=IMAGE_VERSION,
                     master_machine_type=MASTER_MACHINE_TYPE,
+                    master_disk_type=MASTER_DISK_TYPE,
                     master_disk_size=MASTER_DISK_SIZE,
                     worker_machine_type=WORKER_MACHINE_TYPE,
+                    worker_disk_type=WORKER_DISK_TYPE,
                     worker_disk_size=WORKER_DISK_SIZE,
                     num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS,
                     labels=deepcopy(labels),
@@ -159,8 +163,10 @@ def test_init(self):
             self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION)
             self.assertEqual(dataproc_operator.master_machine_type, 
MASTER_MACHINE_TYPE)
             self.assertEqual(dataproc_operator.master_disk_size, 
MASTER_DISK_SIZE)
+            self.assertEqual(dataproc_operator.master_disk_type, 
MASTER_DISK_TYPE)
             self.assertEqual(dataproc_operator.worker_machine_type, 
WORKER_MACHINE_TYPE)
             self.assertEqual(dataproc_operator.worker_disk_size, 
WORKER_DISK_SIZE)
+            self.assertEqual(dataproc_operator.worker_disk_type, 
WORKER_DISK_TYPE)
             self.assertEqual(dataproc_operator.num_preemptible_workers,
                              NUM_PREEMPTIBLE_WORKERS)
             self.assertEqual(dataproc_operator.labels, self.labels[suffix])


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to