This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5e15da144c2c5de22e2f4de363980de9959f8af6 Author: Pete DeJoy <[email protected]> AuthorDate: Wed Feb 26 13:59:27 2020 -0500 [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523) (cherry picked from commit 676c8515f7bdc8d2767a89484261056dc4bf4bed) --- airflow/config_templates/config.yml | 12 ++++++++++++ airflow/config_templates/default_airflow.cfg | 8 ++++++++ airflow/executors/kubernetes_executor.py | 8 +++++++- scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py | 2 ++ tests/kubernetes/test_worker_configuration.py | 11 +++++++++++ 5 files changed, 40 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 9535d5b..d2621eb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1824,6 +1824,7 @@ description: | Allows users to launch pods in multiple namespaces. Will require creating a cluster-role for the scheduler + version_added: 1.10.12 type: boolean example: ~ default: "False" @@ -2179,6 +2180,17 @@ type: string example: ~ default: "" + - name: delete_option_kwargs + description: | + Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client + ``core_v1_api`` method when using the Kubernetes Executor. + This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` + class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: 1.10.12 + type: string + example: '{"grace_period_seconds": 10}' + default: "" - name: run_as_user description: | Specifies the uid to run the first process of the worker pods containers as diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 9729403..71a5172 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1040,6 +1040,14 @@ tolerations = # The timeout is specified as [connect timeout, read timeout] kube_client_request_args = +# Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +# ``core_v1_api`` method when using the Kubernetes Executor. +# This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +# class defined here: +# https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 +# Example: delete_option_kwargs = {{"grace_period_seconds": 10}} +delete_option_kwargs = + # Specifies the uid to run the first process of the worker pods containers as run_as_user = 50000 diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 7b31b45..b6784a8 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -220,6 +220,12 @@ class KubeConfig: self.kube_client_request_args = {} self._validate() + delete_option_kwargs = conf.get(self.kubernetes_section, 'delete_option_kwargs') + if delete_option_kwargs: + self.delete_option_kwargs = json.loads(delete_option_kwargs) + else: + self.delete_option_kwargs = {} + # pod security context items should return integers # and only return a blank string if contexts are not set. def _get_security_context_val(self, scontext): @@ -458,7 +464,7 @@ class AirflowKubernetesScheduler(LoggingMixin): """Deletes POD""" try: self.kube_client.delete_namespaced_pod( - pod_id, namespace, body=client.V1DeleteOptions(), + pod_id, namespace, body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs), **self.kube_config.kube_client_request_args) except ApiException as e: # If the pod is already deleted diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py index a578689..278a223 100755 --- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py +++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py @@ -115,6 +115,8 @@ def write_config(yaml_config_file_path, default_cfg_file_path): configfile.write("# {}\n".format(single_line_desc)) if option["example"]: + if not str(option["name"]).endswith("_template"): + option["example"] = option["example"].replace("{", "{{").replace("}", "}}") configfile.write("# Example: {} = {}\n".format(option["name"], option["example"])) configfile.write("{}{} ={}\n".format( diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 0273ae8..48dc3e3 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -18,6 +18,7 @@ import unittest import six +from parameterized import parameterized from tests.compat import mock from tests.test_utils.config import conf_vars @@ -116,6 +117,16 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): 'but not both$'): KubeConfig() + @parameterized.expand([ + ('{"grace_period_seconds": 10}', {"grace_period_seconds": 10}), + ("", {}) + ]) + def test_delete_option_kwargs_config(self, config, expected_value): + with conf_vars({ + ('kubernetes', 'delete_option_kwargs'): config, + }): + self.assertEqual(KubeConfig().delete_option_kwargs, expected_value) + def test_worker_with_subpaths(self): self.kube_config.dags_volume_subpath = 'dags' self.kube_config.logs_volume_subpath = 'logs'
