Fokko closed pull request #4015: [AIRFLOW-2789] Create single node DataProc
cluster
URL: https://github.com/apache/incubator-airflow/pull/4015
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/operators/dataproc_operator.py
b/airflow/contrib/operators/dataproc_operator.py
index 49e24a3df2..af2a211539 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -52,7 +52,8 @@ class DataprocClusterCreateOperator(BaseOperator):
:param project_id: The ID of the google cloud project in which
to create the cluster. (templated)
:type project_id: str
- :param num_workers: The # of workers to spin up
+ :param num_workers: The # of workers to spin up. If set to zero will
+ spin up cluster in a single node mode
:type num_workers: int
:param storage_bucket: The storage bucket to use, setting to None lets
dataproc
generate a custom one for you
@@ -186,7 +187,7 @@ def __init__(self,
self.metadata = metadata
self.custom_image = custom_image
self.image_version = image_version
- self.properties = properties
+ self.properties = properties or dict()
self.master_machine_type = master_machine_type
self.master_disk_type = master_disk_type
self.master_disk_size = master_disk_size
@@ -205,10 +206,18 @@ def __init__(self,
self.idle_delete_ttl = idle_delete_ttl
self.auto_delete_time = auto_delete_time
self.auto_delete_ttl = auto_delete_ttl
+ self.single_node = num_workers == 0
assert not (self.custom_image and self.image_version), \
"custom_image and image_version can't be both set"
+ assert (
+ not self.single_node or (
+ self.single_node and self.num_preemptible_workers == 0
+ )
+ ), "num_workers == 0 means single node mode - no preemptibles allowed"
+
+
def _get_cluster_list_for_project(self, service):
result = service.projects().regions().clusters().list(
projectId=self.project_id,
@@ -351,7 +360,12 @@ def _build_cluster_data(self):
'{}/global/images/{}'.format(self.project_id,
self.custom_image)
cluster_data['config']['masterConfig']['imageUri'] =
custom_image_url
- cluster_data['config']['workerConfig']['imageUri'] =
custom_image_url
+ if not self.single_node:
+ cluster_data['config']['workerConfig']['imageUri'] =
custom_image_url
+
+ if self.single_node:
+ self.properties["dataproc:dataproc.allow.zero.workers"] = "true"
+
if self.properties:
cluster_data['config']['softwareConfig']['properties'] =
self.properties
if self.idle_delete_ttl:
diff --git a/tests/contrib/operators/test_dataproc_operator.py
b/tests/contrib/operators/test_dataproc_operator.py
index 60c1268ee7..fb90606ea5 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -298,6 +298,34 @@ def test_init_with_custom_image(self):
self.assertEqual(cluster_data['config']['workerConfig']['imageUri'],
expected_custom_image_url)
+ def test_build_single_node_cluster(self):
+ dataproc_operator = DataprocClusterCreateOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=0,
+ num_preemptible_workers=0,
+ zone=ZONE,
+ dag=self.dag
+ )
+ cluster_data = dataproc_operator._build_cluster_data()
+ self.assertEqual(
+ cluster_data['config']['softwareConfig']['properties']
+ ['dataproc:dataproc.allow.zero.workers'], "true")
+
+ def test_init_cluster_with_zero_workers_and_not_non_zero_preemtibles(self):
+ with self.assertRaises(AssertionError):
+ DataprocClusterCreateOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ num_workers=0,
+ num_preemptible_workers=2,
+ zone=ZONE,
+ dag=self.dag,
+ image_version=IMAGE_VERSION,
+ )
+
def test_cluster_name_log_no_sub(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook')
\
as mock_hook:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services