Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 84cfbf6a1 -> cb264e940
[AIRFLOW-2331] Support init action timeout on dataproc cluster create Closes #3235 from piffall/master (cherry picked from commit e44688ed090a438a20751e11cc96c72554630f1d) Signed-off-by: Fokko Driesprong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb264e94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb264e94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb264e94 Branch: refs/heads/v1-10-test Commit: cb264e940e90a865726ea788e5fd3c2f98f4817e Parents: 84cfbf6 Author: Cristòfol Torrens <[email protected]> Authored: Thu Apr 26 09:59:51 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Thu Apr 26 10:00:17 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 29 ++++++++++++++++++-- .../contrib/operators/test_dataproc_operator.py | 12 ++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 728fae9..56ebb91 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,8 +20,10 @@ import ntpath import os +import re import time import uuid +from datetime import timedelta from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook @@ -57,6 +59,9 @@ class DataprocClusterCreateOperator(BaseOperator): :param init_actions_uris: List of GCS uri's containing dataproc initialization scripts :type init_actions_uris: list[string] + :param init_action_timeout: Amount of time executable scripts in + init_actions_uris has to complete + :type init_action_timeout: string :param metadata: dict of key-value google compute engine metadata entries to add to all instances :type metadata: dict @@ -115,6 +120,7 @@ class DataprocClusterCreateOperator(BaseOperator): tags=None, storage_bucket=None, init_actions_uris=None, + init_action_timeout="10m", metadata=None, image_version=None, properties=None, @@ -141,6 +147,7 @@ class DataprocClusterCreateOperator(BaseOperator): self.num_preemptible_workers = num_preemptible_workers self.storage_bucket = storage_bucket self.init_actions_uris = init_actions_uris + self.init_action_timeout = init_action_timeout self.metadata = metadata self.image_version = image_version self.properties = properties @@ -206,6 +213,19 @@ class DataprocClusterCreateOperator(BaseOperator): return time.sleep(15) + def _get_init_action_timeout(self): + match = re.match(r"^(\d+)(s|m)$", self.init_action_timeout) + if match: + if match.group(2) == "s": + return self.init_action_timeout + elif match.group(2) == "m": + val = float(match.group(1)) + return "{}s".format(timedelta(minutes=val).seconds) + + raise AirflowException( + "DataprocClusterCreateOperator init_action_timeout" + " should be expressed in minutes or seconds. i.e. 10m, 30s") + def _build_cluster_data(self): zone_uri = \ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( @@ -276,7 +296,10 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['softwareConfig']['properties'] = self.properties if self.init_actions_uris: init_actions_dict = [ - {'executableFile': uri} for uri in self.init_actions_uris + { + 'executableFile': uri, + 'executionTimeout': self._get_init_action_timeout() + } for uri in self.init_actions_uris ] cluster_data['config']['initializationActions'] = init_actions_dict if self.service_account: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb264e94/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 5863b46..e8cd1e5 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -62,6 +62,7 @@ MASTER_DISK_SIZE = 100 WORKER_MACHINE_TYPE = 'n1-standard-2' WORKER_DISK_SIZE = 100 NUM_PREEMPTIBLE_WORKERS = 2 +GET_INIT_ACTION_TIMEOUT = "600s" # 10m LABEL1 = {} LABEL2 = {'application': 'test', 'year': 2017} SERVICE_ACCOUNT_SCOPES = [ @@ -130,9 +131,16 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): self.assertEqual(dataproc_operator.master_disk_size, MASTER_DISK_SIZE) self.assertEqual(dataproc_operator.worker_machine_type, WORKER_MACHINE_TYPE) self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE) - self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS) + self.assertEqual(dataproc_operator.num_preemptible_workers, + NUM_PREEMPTIBLE_WORKERS) self.assertEqual(dataproc_operator.labels, self.labels[suffix]) - self.assertEqual(dataproc_operator.service_account_scopes, SERVICE_ACCOUNT_SCOPES) + self.assertEqual(dataproc_operator.service_account_scopes, + SERVICE_ACCOUNT_SCOPES) + + def test_get_init_action_timeout(self): + for suffix, dataproc_operator in enumerate(self.dataproc_operators): + timeout = dataproc_operator._get_init_action_timeout() + self.assertEqual(timeout, "600s") def test_build_cluster_data(self): for suffix, dataproc_operator in enumerate(self.dataproc_operators):
