Repository: incubator-airflow Updated Branches: refs/heads/master c5fa8cd41 -> b02820a7a
[AIRFLOW-2313] Add TTL parameters for Dataproc Three additional optional parameters to DataprocClusterCreateOperator which configure Cloud Dataproc Cluster Scheduled Deletion features. Closes #3217 from ebartkus/dataproc-ttl Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b02820a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b02820a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b02820a7 Branch: refs/heads/master Commit: b02820a7afcb5205df39c4a639f1ceeb2c9c75ee Parents: c5fa8cd Author: ebartkus <[email protected]> Authored: Thu May 3 23:10:49 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Thu May 3 23:10:49 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataproc_hook.py | 2 +- airflow/contrib/operators/dataproc_operator.py | 30 ++++++++- .../contrib/operators/test_dataproc_operator.py | 67 ++++++++++++++++++-- 3 files changed, 93 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index 7d95897..7849e17 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -197,7 +197,7 @@ class DataProcHook(GoogleCloudBaseHook): def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None, - api_version='v1'): + api_version='v1beta2'): super(DataProcHook, self).__init__(gcp_conn_id, delegate_to) self.api_version = api_version http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index ad0aa09..1614720 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -32,6 +32,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.version import version from googleapiclient.errors import HttpError +from airflow.utils import timezone class DataprocClusterCreateOperator(BaseOperator): @@ -105,6 +106,16 @@ class DataprocClusterCreateOperator(BaseOperator): :type service_account: string :param service_account_scopes: The URIs of service account scopes to be included. :type service_account_scopes: list[string] + :param idle_delete_ttl: The longest duration that cluster would keep alive while + staying idle. Passing this threshold will cause cluster to be auto-deleted. + A duration in seconds. + :type idle_delete_ttl: int + :param auto_delete_time: The time when cluster will be auto-deleted. + :type auto_delete_time: datetime + :param auto_delete_ttl: The life duration of cluster, the cluster will be + auto-deleted at the end of this duration. + A duration in seconds. (If auto_delete_time is set this parameter will be ignored) + :type auto_delete_ttl: int """ template_fields = ['cluster_name', 'project_id', 'zone', 'region'] @@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator): delegate_to=None, service_account=None, service_account_scopes=None, + idle_delete_ttl=None, + auto_delete_time=None, + auto_delete_ttl=None, *args, **kwargs): @@ -163,6 +177,9 @@ class DataprocClusterCreateOperator(BaseOperator): self.region = region self.service_account = service_account self.service_account_scopes = service_account_scopes + self.idle_delete_ttl = idle_delete_ttl + self.auto_delete_time = auto_delete_time + self.auto_delete_ttl = auto_delete_ttl def _get_cluster_list_for_project(self, service): result = service.projects().regions().clusters().list( @@ -261,7 +278,8 @@ class DataprocClusterCreateOperator(BaseOperator): } }, 'secondaryWorkerConfig': {}, - 'softwareConfig': {} + 'softwareConfig': {}, + 'lifecycleConfig': {} } } if self.num_preemptible_workers > 0: @@ -294,6 +312,16 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version if self.properties: cluster_data['config']['softwareConfig']['properties'] = self.properties + if self.idle_delete_ttl: + cluster_data['config']['lifecycleConfig']['idleDeleteTtl'] = \ + "{}s".format(self.idle_delete_ttl) + if self.auto_delete_time: + utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time) + cluster_data['config']['lifecycleConfig']['autoDeleteTime'] = \ + utc_auto_delete_time.format('%Y-%m-%dT%H:%M:%S.%fZ', formatter='classic') + elif self.auto_delete_ttl: + cluster_data['config']['lifecycleConfig']['autoDeleteTtl'] = \ + "{}s".format(self.auto_delete_ttl) if self.init_actions_uris: init_actions_dict = [ { http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b02820a7/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index e8cd1e5..d039cf1 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -69,6 +69,9 @@ SERVICE_ACCOUNT_SCOPES = [ 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/bigtable.data' ] +IDLE_DELETE_TTL = 321 +AUTO_DELETE_TIME = datetime.datetime(2017, 6, 7) +AUTO_DELETE_TTL = 654 DEFAULT_DATE = datetime.datetime(2017, 6, 6) REGION = 'test-region' MAIN_URI = 'test-uri' @@ -102,8 +105,11 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): worker_machine_type=WORKER_MACHINE_TYPE, worker_disk_size=WORKER_DISK_SIZE, num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS, - labels = deepcopy(labels), - service_account_scopes = SERVICE_ACCOUNT_SCOPES + labels=deepcopy(labels), + service_account_scopes=SERVICE_ACCOUNT_SCOPES, + idle_delete_ttl=IDLE_DELETE_TTL, + auto_delete_time=AUTO_DELETE_TIME, + auto_delete_ttl=AUTO_DELETE_TTL ) ) self.dag = DAG( @@ -136,6 +142,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): self.assertEqual(dataproc_operator.labels, self.labels[suffix]) self.assertEqual(dataproc_operator.service_account_scopes, SERVICE_ACCOUNT_SCOPES) + self.assertEqual(dataproc_operator.idle_delete_ttl, IDLE_DELETE_TTL) + self.assertEqual(dataproc_operator.auto_delete_time, AUTO_DELETE_TIME) + self.assertEqual(dataproc_operator.auto_delete_ttl, AUTO_DELETE_TTL) def test_get_init_action_timeout(self): for suffix, dataproc_operator in enumerate(self.dataproc_operators): @@ -160,6 +169,10 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): NETWORK_URI) self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'], TAGS) + self.assertEqual(cluster_data['config']['lifecycleConfig']['idleDeleteTtl'], + "321s") + self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'], + "2017-06-07T00:00:00.000000Z") # test whether the default airflow-version label has been properly # set to the dataproc operator. merged_labels = {} @@ -169,6 +182,52 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): cluster_data['labels']['airflow-version'])) self.assertEqual(cluster_data['labels'], merged_labels) + def test_build_cluster_data_with_autoDeleteTime(self): + dataproc_operator = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + zone=ZONE, + dag=self.dag, + auto_delete_time=AUTO_DELETE_TIME, + ) + cluster_data = dataproc_operator._build_cluster_data() + self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'], + "2017-06-07T00:00:00.000000Z") + + def test_build_cluster_data_with_autoDeleteTtl(self): + dataproc_operator = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + zone=ZONE, + dag=self.dag, + auto_delete_ttl=AUTO_DELETE_TTL, + ) + cluster_data = dataproc_operator._build_cluster_data() + self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTtl'], + "654s") + + def test_build_cluster_data_with_autoDeleteTime_and_autoDeleteTtl(self): + dataproc_operator = DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + zone=ZONE, + dag=self.dag, + auto_delete_time=AUTO_DELETE_TIME, + auto_delete_ttl=AUTO_DELETE_TTL, + ) + cluster_data = dataproc_operator._build_cluster_data() + if 'autoDeleteTtl' in cluster_data['config']['lifecycleConfig']: + self.fail("If 'auto_delete_time' and 'auto_delete_ttl' is set, " + + "only `auto_delete_time` is used") + self.assertEqual(cluster_data['config']['lifecycleConfig']['autoDeleteTime'], + "2017-06-07T00:00:00.000000Z") + def test_cluster_name_log_no_sub(self): with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: mock_hook.return_value.get_conn = self.mock_conn
