This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 20ed260c3aee7674488bc3231dea81378d6ddb21 Author: Kaxil Naik <[email protected]> AuthorDate: Mon Apr 5 16:56:00 2021 +0100 Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197) This feature was added in https://github.com/apache/airflow/pull/11784 but it was broken as it got `pod_template_override` from `executor_config` instead of `pod_template_file`. closes #14199 (cherry picked from commit 5606137ba32c0daa87d557301d82f7f2bdc0b0a4) --- .../example_kubernetes_executor_config.py | 3 +- airflow/executors/kubernetes_executor.py | 2 +- .../basic_template.yaml | 4 +- docs/apache-airflow/executor/kubernetes.rst | 2 +- .../basic_template.yaml | 34 ++++++++ tests/executors/test_kubernetes_executor.py | 91 +++++++++++++++++++++- 6 files changed, 130 insertions(+), 6 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py index cbd69cb..5290dd8 100644 --- a/airflow/example_dags/example_kubernetes_executor_config.py +++ b/airflow/example_dags/example_kubernetes_executor_config.py @@ -24,6 +24,7 @@ import os from airflow import DAG from airflow.example_dags.libs.helper import print_stuff from airflow.operators.python import PythonOperator +from airflow.settings import AIRFLOW_HOME from airflow.utils.dates import days_ago default_args = { @@ -110,7 +111,7 @@ try: task_id="task_with_template", python_callable=print_stuff, executor_config={ - "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml", + "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"), "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})), }, ) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 7e3d82b..ec7cbf7 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -496,7 +496,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): return if executor_config: - pod_template_file = executor_config.get("pod_template_override", None) + pod_template_file = executor_config.get("pod_template_file", None) else: pod_template_file = None if not self.task_queue: diff --git a/airflow/kubernetes_executor_templates/basic_template.yaml b/airflow/kubernetes_executor_templates/basic_template.yaml index a953867..a6eb83f 100644 --- a/airflow/kubernetes_executor_templates/basic_template.yaml +++ b/airflow/kubernetes_executor_templates/basic_template.yaml @@ -69,8 +69,8 @@ spec: defaultMode: 420 restartPolicy: Never terminationGracePeriodSeconds: 30 - serviceAccountName: airflow-worker-serviceaccount - serviceAccount: airflow-worker-serviceaccount + serviceAccountName: airflow-worker + serviceAccount: airflow-worker securityContext: runAsUser: 50000 fsGroup: 50000 diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst index 217a29c..61d13f4 100644 --- a/docs/apache-airflow/executor/kubernetes.rst +++ b/docs/apache-airflow/executor/kubernetes.rst @@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar. :end-before: [END task_with_sidecar] You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks. -This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``. +This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override``. Here is an example of a task with both features: diff --git a/tests/executors/kubernetes_executor_template_files/basic_template.yaml b/tests/executors/kubernetes_executor_template_files/basic_template.yaml new file mode 100644 index 0000000..1fb00f2 --- /dev/null +++ b/tests/executors/kubernetes_executor_template_files/basic_template.yaml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +kind: Pod +apiVersion: v1 +metadata: + name: dummy-name-dont-delete + namespace: dummy-name-dont-delete + labels: + mylabel: foo +spec: + containers: + - name: base + image: dummy-name-dont-delete + securityContext: + runAsUser: 50000 + fsGroup: 50000 + imagePullSecrets: + - name: airflow-registry + schedulerName: default-scheduler diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 68b0006..8d3d5b4 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # +import pathlib import random import re import string @@ -22,6 +23,7 @@ import unittest from datetime import datetime from unittest import mock +import pytest from kubernetes.client import models as k8s from urllib3 import HTTPResponse @@ -39,7 +41,7 @@ try: get_base_pod_from_template, ) from airflow.kubernetes import pod_generator - from airflow.kubernetes.pod_generator import PodGenerator + from airflow.kubernetes.pod_generator import PodGenerator, datetime_to_label_safe_datestring from airflow.utils.state import State except ImportError: AirflowKubernetesScheduler = None # type: ignore @@ -215,6 +217,93 @@ class TestKubernetesExecutor(unittest.TestCase): assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + @pytest.mark.execution_timeout(10) + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async): + current_folder = pathlib.Path(__file__).parent.absolute() + template_file = str( + (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute() + ) + + mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True) + mock_get_kube_client.return_value = mock_kube_client + + with conf_vars({('kubernetes', 'pod_template_file'): ''}): + executor = self.kubernetes_executor + executor.start() + + assert executor.event_buffer == {} + assert executor.task_queue.empty() + + execution_date = datetime.utcnow() + + executor.execute_async( + key=('dag', 'task', execution_date, 1), + queue=None, + command=['airflow', 'tasks', 'run', 'true', 'some_parameter'], + executor_config={ + "pod_template_file": template_file, + "pod_override": k8s.V1Pod( + metadata=k8s.V1ObjectMeta(labels={"release": "stable"}), + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="airflow:3.6")], + ), + ), + }, + ) + + assert not executor.task_queue.empty() + task = executor.task_queue.get_nowait() + _, _, expected_executor_config, expected_pod_template_file = task + + # Test that the correct values have been put to queue + assert expected_executor_config.metadata.labels == {'release': 'stable'} + assert expected_pod_template_file == template_file + + self.kubernetes_executor.kube_scheduler.run_next(task) + mock_run_pod_async.assert_called_once_with( + k8s.V1Pod( + api_version="v1", + kind="Pod", + metadata=k8s.V1ObjectMeta( + name=mock.ANY, + namespace="default", + annotations={ + 'dag_id': 'dag', + 'execution_date': execution_date.isoformat(), + 'task_id': 'task', + 'try_number': '1', + }, + labels={ + 'airflow-worker': '5', + 'airflow_version': mock.ANY, + 'dag_id': 'dag', + 'execution_date': datetime_to_label_safe_datestring(execution_date), + 'kubernetes_executor': 'True', + 'mylabel': 'foo', + 'release': 'stable', + 'task_id': 'task', + 'try_number': '1', + }, + ), + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + image="airflow:3.6", + args=['airflow', 'tasks', 'run', 'true', 'some_parameter'], + env=[k8s.V1EnvVar(name='AIRFLOW_IS_K8S_EXECUTOR_POD', value='True')], + ) + ], + image_pull_secrets=[k8s.V1LocalObjectReference(name='airflow-registry')], + scheduler_name='default-scheduler', + security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000), + ), + ) + ) + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
