This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 71e2e6fff9c48ae78933957fea682db7a2e3b7ff Author: Daniel Imberman <[email protected]> AuthorDate: Thu Nov 5 14:48:05 2020 -0800 Add ability to specify pod_template_file in executor_config (#11784) * Add pod_template_override to executor_config Users will be able to override the base pod_template_file on a per-task basis. * change docstring * fix doc * fix static checks * add description (cherry picked from commit 68ba54bbd5a275fba1a126f8e67bd69e5cf4b362) --- .../example_kubernetes_executor_config.py | 95 +++++++++++++++--- airflow/executors/kubernetes_executor.py | 53 +++++++--- airflow/kubernetes/pod_generator.py | 26 ++++- .../dags_in_image_template.yaml | 84 ++++++++++++++++ .../dags_in_volume_template.yaml | 81 ++++++++++++++++ .../git_sync_template.yaml | 107 +++++++++++++++++++++ airflow/serialization/enums.py | 2 + airflow/serialization/serialized_objects.py | 18 ++++ chart/requirements.lock | 6 +- docs/executor/kubernetes.rst | 95 ++++++++++++++++++ tests/executors/test_kubernetes_executor.py | 26 ++++- tests/serialization/test_dag_serialization.py | 8 ++ 12 files changed, 563 insertions(+), 38 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py index 2e4ba00..e3f42d0 100644 --- a/airflow/example_dags/example_kubernetes_executor_config.py +++ b/airflow/example_dags/example_kubernetes_executor_config.py @@ -22,6 +22,8 @@ This is an example dag for using a Kubernetes Executor Configuration. from __future__ import print_function import os +from kubernetes.client import models as k8s + from airflow.contrib.example_dags.libs.helper import print_stuff from airflow.models import DAG @@ -40,6 +42,20 @@ with DAG( schedule_interval=None ) as dag: + def test_sharedvolume_mount(): + """ + Tests whether the volume has been mounted. + """ + for i in range(5): + try: + return_code = os.system("cat /shared/test.txt") + if return_code != 0: + raise ValueError("Error when checking volume mount. Return code {return_code}" + .format(return_code=return_code)) + except ValueError as e: + if i > 4: + raise e + def test_volume_mount(): """ Tests whether the volume has been mounted. @@ -61,27 +77,74 @@ with DAG( } ) + # [START task_with_volume] + # You can mount volume or secret to the worker pod second_task = PythonOperator( task_id="four_task", python_callable=test_volume_mount, executor_config={ - "KubernetesExecutor": { - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ] - } - } + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[ + k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume") + ], + ) + ], + volumes=[ + k8s.V1Volume( + name="example-kubernetes-test-volume", + host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), + ) + ], + ) + ), + }, + ) + # [END task_with_volume] + + # [START task_with_template] + task_with_template = PythonOperator( + task_id="task_with_template", + python_callable=print_stuff, + executor_config={ + "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml", + "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})), + }, + ) + # [END task_with_template] + + # [START task_with_sidecar] + sidecar_task = PythonOperator( + task_id="task_with_sidecar", + python_callable=test_sharedvolume_mount, + executor_config={ + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], + ), + k8s.V1Container( + name="sidecar", + image="ubuntu", + args=["echo \"retrieved from mount\" > /shared/test.txt"], + command=["bash", "-cx"], + volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], + ), + ], + volumes=[ + k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()), + ], + ) + ), + }, ) + # [END task_with_sidecar] # Test that we can add labels to pods third_task = PythonOperator( @@ -111,3 +174,5 @@ with DAG( start_task >> second_task >> third_task start_task >> other_ns_task + start_task >> sidecar_task + start_task >> task_with_template diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 73dd91e..bdbd1cb 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -30,9 +30,9 @@ from queue import Empty import kubernetes from dateutil import parser -from kubernetes import watch, client +from kubernetes import client, watch from kubernetes.client.rest import ApiException -from urllib3.exceptions import HTTPError, ReadTimeoutError +from urllib3.exceptions import ReadTimeoutError from airflow import settings from airflow.configuration import conf @@ -427,11 +427,18 @@ class AirflowKubernetesScheduler(LoggingMixin): status """ self.log.info('Kubernetes job is %s', str(next_job)) - key, command, kube_executor_config = next_job + key, command, kube_executor_config, pod_template_file = next_job dag_id, task_id, execution_date, try_number = key if command[0:2] != ["airflow", "run"]: - raise ValueError('The command must start with ["airflow", "run"].') + raise ValueError('The command must start with ["airflow", "tasks", "run"].') + + base_worker_pod = get_base_pod_from_template(pod_template_file, self.kube_config) + + if not base_worker_pod: + raise AirflowException( + "could not find a valid worker template yaml at {}".format(self.kube_config.pod_template_file) + ) pod = PodGenerator.construct_pod( namespace=self.namespace, @@ -662,6 +669,21 @@ class AirflowKubernetesScheduler(LoggingMixin): self._manager.shutdown() +def get_base_pod_from_template(pod_template_file, kube_config): + """ + Reads either the pod_template_file set in the executor_config or the base pod_template_file + set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor + + :param pod_template_file: absolute path to a pod_template_file.yaml or None + :param kube_config: The KubeConfig class generated by airflow that contains all kube metadata + :return: a V1Pod that can be used as the base pod for k8s tasks + """ + if pod_template_file: + return PodGenerator.deserialize_model_file(pod_template_file) + else: + return PodGenerator.deserialize_model_file(kube_config.pod_template_file) + + class KubernetesExecutor(BaseExecutor, LoggingMixin): """Executor for Kubernetes""" def __init__(self): @@ -794,7 +816,11 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): ) kube_executor_config = PodGenerator.from_obj(executor_config) - self.task_queue.put((key, command, kube_executor_config)) + if executor_config: + pod_template_file = executor_config.get("pod_template_override", None) + else: + pod_template_file = None + self.task_queue.put((key, command, kube_executor_config, pod_template_file)) def sync(self): """Synchronize task state.""" @@ -831,13 +857,16 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): try: self.kube_scheduler.run_next(task) except ApiException as e: - self.log.warning('ApiException when attempting to run task, re-queueing. ' - 'Message: %s', json.loads(e.body)['message']) - self.task_queue.put(task) - except HTTPError as e: - self.log.warning('HTTPError when attempting to run task, re-queueing. ' - 'Exception: %s', str(e)) - self.task_queue.put(task) + if e.reason == "BadRequest": + self.log.error("Request was invalid. Failing task") + key, _, _, _ = task + self.change_state(key, State.FAILED, e) + else: + self.log.warning( + 'ApiException when attempting to run task, re-queueing. ' 'Message: %s', + json.loads(e.body)['message'], + ) + self.task_queue.put(task) finally: self.task_queue.task_done() except Empty: diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 5a57230..e12fba4 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -564,6 +564,18 @@ class PodGenerator(object): return reduce(PodGenerator.reconcile_pods, pod_list) @staticmethod + def serialize_pod(pod): + """ + + Converts a k8s.V1Pod into a jsonified object + + @param pod: + @return: + """ + api_client = ApiClient() + return api_client.sanitize_for_serialization(pod) + + @staticmethod def deserialize_model_file(path): """ :param path: Path to the file @@ -573,15 +585,23 @@ class PodGenerator(object): ``_ApiClient__deserialize_model`` from the kubernetes client. This issue is tracked here; https://github.com/kubernetes-client/python/issues/977. """ - api_client = ApiClient() if os.path.exists(path): with open(path) as stream: pod = yaml.safe_load(stream) else: pod = yaml.safe_load(path) - # pylint: disable=protected-access - return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod) + return PodGenerator.deserialize_model_dict(pod) + + @staticmethod + def deserialize_model_dict(pod_dict): + """ + Deserializes python dictionary to k8s.V1Pod + @param pod_dict: + @return: + """ + api_client = ApiClient() + return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod) # pylint: disable=W0212 def merge_objects(base_obj, client_obj): diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml new file mode 100644 index 0000000..b1995c2 --- /dev/null +++ b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# [START template_with_dags_in_image] +--- +apiVersion: v1 +kind: Pod +metadata: + name: dummy-name +spec: + containers: + - args: [] + command: [] + env: + - name: AIRFLOW__CORE__EXECUTOR + value: LocalExecutor + # Hard Coded Airflow Envs + - name: AIRFLOW__CORE__FERNET_KEY + valueFrom: + secretKeyRef: + name: RELEASE-NAME-fernet-key + key: fernet-key + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + - name: AIRFLOW_CONN_AIRFLOW_DB + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + envFrom: [] + image: dummy_image + imagePullPolicy: IfNotPresent + name: base + ports: [] + volumeMounts: + - mountPath: "/opt/airflow/logs" + name: airflow-logs + - mountPath: /opt/airflow/dags + name: airflow-dags + readOnly: false + - mountPath: /opt/airflow/dags + name: airflow-dags + readOnly: true + subPath: repo/tests/dags + hostNetwork: false + restartPolicy: Never + securityContext: + runAsUser: 50000 + nodeSelector: + {} + affinity: + {} + tolerations: + [] + serviceAccountName: 'RELEASE-NAME-worker-serviceaccount' + volumes: + - name: dags + persistentVolumeClaim: + claimName: RELEASE-NAME-dags + - emptyDir: {} + name: airflow-logs + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-config + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-local-settings +# [END template_with_dags_in_image] diff --git a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml new file mode 100644 index 0000000..86b5358 --- /dev/null +++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# [START template_with_dags_in_volume] +--- +apiVersion: v1 +kind: Pod +metadata: + name: dummy-name +spec: + containers: + - args: [] + command: [] + env: + - name: AIRFLOW__CORE__EXECUTOR + value: LocalExecutor + # Hard Coded Airflow Envs + - name: AIRFLOW__CORE__FERNET_KEY + valueFrom: + secretKeyRef: + name: RELEASE-NAME-fernet-key + key: fernet-key + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + - name: AIRFLOW_CONN_AIRFLOW_DB + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + envFrom: [] + image: dummy_image + imagePullPolicy: IfNotPresent + name: base + ports: [] + volumeMounts: + - mountPath: "/opt/airflow/logs" + name: airflow-logs + - mountPath: /opt/airflow/dags + name: airflow-dags + readOnly: true + subPath: repo/tests/dags + hostNetwork: false + restartPolicy: Never + securityContext: + runAsUser: 50000 + nodeSelector: + {} + affinity: + {} + tolerations: + [] + serviceAccountName: 'RELEASE-NAME-worker-serviceaccount' + volumes: + - name: dags + persistentVolumeClaim: + claimName: RELEASE-NAME-dags + - emptyDir: {} + name: airflow-logs + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-config + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-local-settings +# [END template_with_dags_in_volume] diff --git a/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml new file mode 100644 index 0000000..a962a8f --- /dev/null +++ b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +# [START git_sync_template] +apiVersion: v1 +kind: Pod +metadata: + name: dummy-name +spec: + initContainers: + - name: git-sync + image: "k8s.gcr.io/git-sync:v3.1.6" + env: + - name: GIT_SYNC_REV + value: "HEAD" + - name: GIT_SYNC_BRANCH + value: "v1-10-stable" + - name: GIT_SYNC_REPO + value: "https://github.com/apache/airflow.git" + - name: GIT_SYNC_DEPTH + value: "1" + - name: GIT_SYNC_ROOT + value: "/git" + - name: GIT_SYNC_DEST + value: "repo" + - name: GIT_SYNC_ADD_USER + value: "true" + - name: GIT_SYNC_WAIT + value: "60" + - name: GIT_SYNC_MAX_SYNC_FAILURES + value: "0" + volumeMounts: + - name: dags + mountPath: /git + containers: + - args: [] + command: [] + env: + - name: AIRFLOW__CORE__EXECUTOR + value: LocalExecutor + # Hard Coded Airflow Envs + - name: AIRFLOW__CORE__FERNET_KEY + valueFrom: + secretKeyRef: + name: RELEASE-NAME-fernet-key + key: fernet-key + - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + - name: AIRFLOW_CONN_AIRFLOW_DB + valueFrom: + secretKeyRef: + name: RELEASE-NAME-airflow-metadata + key: connection + envFrom: [] + image: dummy_image + imagePullPolicy: IfNotPresent + name: base + ports: [] + volumeMounts: + - mountPath: "/opt/airflow/logs" + name: airflow-logs + - mountPath: /opt/airflow/dags + name: airflow-dags + readOnly: false + - mountPath: /opt/airflow/dags + name: airflow-dags + readOnly: true + hostNetwork: false + restartPolicy: Never + securityContext: + runAsUser: 50000 + nodeSelector: + {} + affinity: + {} + tolerations: + [] + serviceAccountName: 'RELEASE-NAME-worker-serviceaccount' + volumes: + - name: dags + emptyDir: {} + - emptyDir: {} + name: airflow-logs + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-config + - configMap: + name: RELEASE-NAME-airflow-config + name: airflow-local-settings +# [END git_sync_template] diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py index 8e5fee6..e4f72c5 100644 --- a/airflow/serialization/enums.py +++ b/airflow/serialization/enums.py @@ -36,10 +36,12 @@ class DagAttributeTypes(str, Enum): """Enum of supported attribute types of DAG.""" DAG = 'dag' OP = 'operator' + DATETIME = 'datetime' TIMEDELTA = 'timedelta' TIMEZONE = 'timezone' RELATIVEDELTA = 'relativedelta' DICT = 'dict' + POD = 'k8s.V1Pod' SET = 'set' TUPLE = 'tuple' diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index c527ddf..857514e 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -47,6 +47,16 @@ except ImportError: if TYPE_CHECKING: from inspect import Parameter +try: + # isort: off + from kubernetes.client import models as k8s + from airflow.kubernetes.pod_generator import PodGenerator + + # isort: on + HAS_KUBERNETES = True +except ImportError: + HAS_KUBERNETES = False + log = logging.getLogger(__name__) @@ -199,6 +209,9 @@ class BaseSerialization: return [cls._serialize(v) for v in var] elif isinstance(var, DAG): return SerializedDAG.serialize_dag(var) + elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod): + json_pod = PodGenerator.serialize_pod(var) + return cls._encode(json_pod, type_=DAT.POD) elif isinstance(var, BaseOperator): return SerializedBaseOperator.serialize_operator(var) elif isinstance(var, cls._datetime_types): @@ -253,6 +266,11 @@ class BaseSerialization: return SerializedBaseOperator.deserialize_operator(var) elif type_ == DAT.DATETIME: return pendulum.from_timestamp(var) + elif type_ == DAT.POD: + if not HAS_KUBERNETES: + raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!") + pod = PodGenerator.deserialize_model_dict(var) + return pod elif type_ == DAT.TIMEDELTA: return datetime.timedelta(seconds=var) elif type_ == DAT.TIMEZONE: diff --git a/chart/requirements.lock b/chart/requirements.lock index e460e9f..eb62c80 100644 --- a/chart/requirements.lock +++ b/chart/requirements.lock @@ -1,6 +1,6 @@ dependencies: - name: postgresql - repository: https://charts.helm.sh/stable/ + repository: https://kubernetes-charts.storage.googleapis.com version: 6.3.12 -digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98 -generated: "2020-11-07T17:40:45.418723358+01:00" +digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af +generated: "2020-11-04T15:59:36.967913-08:00" diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst index 042d638..ed172a8 100644 --- a/docs/executor/kubernetes.rst +++ b/docs/executor/kubernetes.rst @@ -34,6 +34,101 @@ The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes e - Another option is to use S3/GCS/etc to store logs +To troubleshoot issue with KubernetesExecutor, you can use ``airflow kubernetes generate-dag-yaml`` command. +This command generates the pods as they will be launched in Kubernetes and dumps them into yaml files for you to inspect. + +.. _concepts:pod_template_file: + +pod_template_file +################# + +As of Airflow 1.10.12, you can now use the ``pod_template_file`` option in the ``kubernetes`` section +of the ``airflow.cfg`` file to form the basis of your KubernetesExecutor pods. This process is faster to execute +and easier to modify. + +We include multiple examples of working pod operators below, but we would also like to explain a few necessary components +if you want to customize your template files. As long as you have these components, every other element +in the template is customizable. + +1. Airflow will overwrite the base container image and the pod name + +There are two points where Airflow potentially overwrites the base image: in the ``airflow.cfg`` +or the ``pod_override`` (discussed below) setting. This value is overwritten to ensure that users do +not need to update multiple template files every time they upgrade their docker image. The other field +that Airflow overwrites is the ``pod.metadata.name`` field. This field has to be unique across all pods, +so we generate these names dynamically before launch. + +It's important to note while Airflow overwrites these fields, they **can not be left blank**. +If these fields do not exist, kubernetes can not load the yaml into a Kubernetes V1Pod. + +2. Each Airflow ``pod_template_file`` must have a container named "base" at the ``pod.spec.containers[0]`` position + +Airflow uses the ``pod_template_file`` by making certain assumptions about the structure of the template. +When airflow creates the worker pod's command, it assumes that the airflow worker container part exists +at the beginning of the container array. It then assumes that the container is named ``base`` +when it merges this pod with internal configs. You are more than welcome to create +sidecar containers after this required container. + +With these requirements in mind, here are some examples of basic ``pod_template_file`` YAML files. + +pod_template_file using the ``dag_in_image`` setting: + +.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml + :language: yaml + :start-after: [START template_with_dags_in_image] + :end-before: [END template_with_dags_in_image] + +``pod_template_file`` which stores DAGs in a ``persistentVolume``: + +.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml + :language: yaml + :start-after: [START template_with_dags_in_volume] + :end-before: [END template_with_dags_in_volume] + +``pod_template_file`` which pulls DAGs from git: + +.. exampleinclude:: /../airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml + :language: yaml + :start-after: [START git_sync_template] + :end-before: [END git_sync_template] + +.. _concepts:pod_override: + +pod_override +############ + +When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis. +To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides. +Please note that the scheduler will override the ``metadata.name`` of the V1pod before launching it. + +To overwrite the base container of the pod launched by the KubernetesExecutor, +create a V1pod with a single container, and overwrite the fields as follows: + +.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py + :language: python + :start-after: [START task_with_volume] + :end-before: [END task_with_volume] + +Note that volume mounts environment variables, ports, and devices will all be extended instead of overwritten. + +To add a sidecar container to the launched pod, create a V1pod with an empty first container with the +name ``base`` and a second container containing your desired sidecar. + +.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py + :language: python + :start-after: [START task_with_sidecar] + :end-before: [END task_with_sidecar] + +You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks. +This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``. + +Here is an example of a task with both features: + +.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py + :language: python + :start-after: [START task_with_template] + :end-before: [END task_with_template] + KubernetesExecutor Architecture ################################ diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 3dabb78..f5f415a 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -29,12 +29,15 @@ from tests.compat import mock from tests.test_utils.config import conf_vars try: from kubernetes.client.rest import ApiException - from airflow import configuration # noqa: F401 - from airflow.configuration import conf # noqa: F401 - from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig - from airflow.executors.kubernetes_executor import KubernetesExecutor - from airflow.kubernetes.pod_generator import PodGenerator + + from airflow.executors.kubernetes_executor import ( + AirflowKubernetesScheduler, + KubeConfig, + KubernetesExecutor, + get_base_pod_from_template, + ) from airflow.kubernetes import pod_generator + from airflow.kubernetes.pod_generator import PodGenerator from airflow.utils.state import State except ImportError: AirflowKubernetesScheduler = None # type: ignore @@ -94,6 +97,19 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): ) self.assertTrue(self._is_valid_pod_id(pod_name)) + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @mock.patch("airflow.kubernetes.pod_generator.PodGenerator") + @mock.patch("airflow.executors.kubernetes_executor.KubeConfig") + def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator): + pod_template_file_path = "/bar/biz" + get_base_pod_from_template(pod_template_file_path, None) + self.assertEqual("deserialize_model_dict", mock_generator.mock_calls[0][0]) + self.assertEqual(pod_template_file_path, mock_generator.mock_calls[0][1][0]) + mock_kubeconfig.pod_template_file = "/foo/bar" + get_base_pod_from_template(None, mock_kubeconfig) + self.assertEqual("deserialize_model_dict", mock_generator.mock_calls[1][0]) + self.assertEqual("/foo/bar", mock_generator.mock_calls[1][1][0]) + def test_make_safe_label_value(self): for dag_id, task_id in self._cases(): safe_dag_id = pod_generator.make_safe_label_value(dag_id) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d999cb0..30493fd 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -259,6 +259,14 @@ class TestStringifiedDAGs(unittest.TestCase): assert sorted_serialized_dag(ground_truth_dag) == sorted_serialized_dag(json_dag) + def test_deser_k8s_pod_override(self): + dag = collect_dags('airflow/example_dags')['example_kubernetes_executor_config'] + serialized = SerializedDAG.to_json(dag) + deser_dag = SerializedDAG.from_json(serialized) + p1 = dag.tasks[1].executor_config + p2 = deser_dag.tasks[1].executor_config + self.assertDictEqual(p1['pod_override'].to_dict(), p2['pod_override'].to_dict()) + def test_deserialization_across_process(self): """A serialized DAG can be deserialized in another process."""
