This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 20481c3 Add pod_override setting for KubernetesExecutor (#10756)
20481c3 is described below
commit 20481c3cafb89bcf9b8a1b3a5f7c5470b11838c3
Author: Daniel Imberman <[email protected]>
AuthorDate: Tue Sep 8 15:56:59 2020 -0700
Add pod_override setting for KubernetesExecutor (#10756)
* Add podOverride setting for KubernetesExecutor
Users of the KubernetesExecutor will now have a "podOverride"
option in the executor_config. This option will allow users to
modify the pod launched by the KubernetesExecutor using a
`kubernetes.client.models.V1Pod` class. This is the first step
in deprecating the tradition executor_config.
* Fix k8s tests
* fix docs
---
.../example_kubernetes_executor_config.py | 100 +++++++++++++++++----
airflow/kubernetes/pod_generator.py | 56 +++++++++++-
airflow/serialization/enums.py | 1 +
airflow/serialization/serialized_objects.py | 8 ++
docs/concepts.rst | 9 ++
docs/executor/kubernetes.rst | 29 ++++++
tests/kubernetes/test_pod_generator.py | 40 +++++++++
tests/serialization/test_dag_serialization.py | 27 +++++-
8 files changed, 248 insertions(+), 22 deletions(-)
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py
b/airflow/example_dags/example_kubernetes_executor_config.py
index 5fef135..db64b31 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -20,6 +20,8 @@ This is an example dag for using a Kubernetes Executor
Configuration.
"""
import os
+from kubernetes.client import models as k8s
+
from airflow import DAG
from airflow.example_dags.libs.helper import print_stuff
from airflow.operators.python import PythonOperator
@@ -37,6 +39,19 @@ with DAG(
tags=['example'],
) as dag:
+ def test_sharedvolume_mount():
+ """
+ Tests whether the volume has been mounted.
+ """
+ for i in range(5):
+ try:
+ return_code = os.system("cat /shared/test.txt")
+ if return_code != 0:
+ raise ValueError(f"Error when checking volume mount.
Return code {return_code}")
+ except ValueError as e:
+ if i > 4:
+ raise e
+
def test_volume_mount():
"""
Tests whether the volume has been mounted.
@@ -59,27 +74,75 @@ with DAG(
}
)
- # You can mount volume or secret to the worker pod
- second_task = PythonOperator(
- task_id="four_task",
+ # [START task_with_volume]
+ volume_task = PythonOperator(
+ task_id="task_with_volume",
python_callable=test_volume_mount,
executor_config={
- "KubernetesExecutor": {
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ]
- }
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(
+ mount_path="/foo/",
+ name="example-kubernetes-test-volume"
+ )
+ ]
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="example-kubernetes-test-volume",
+ host_path=k8s.V1HostPathVolumeSource(
+ path="/tmp/"
+ )
+ )
+ ]
+ )
+ ),
+ }
+ )
+ # [END task_with_volume]
+
+ # [START task_with_sidecar]
+ sidecar_task = PythonOperator(
+ task_id="task_with_sidecar",
+ python_callable=test_sharedvolume_mount,
+ executor_config={
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[k8s.V1VolumeMount(
+ mount_path="/shared/",
+ name="shared-empty-dir"
+ )]
+ ),
+ k8s.V1Container(
+ name="sidecar",
+ image="ubuntu",
+ args=["echo \"retrieved from mount\" >
/shared/test.txt"],
+ command=["bash", "-cx"],
+ volume_mounts=[k8s.V1VolumeMount(
+ mount_path="/shared/",
+ name="shared-empty-dir"
+ )]
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="shared-empty-dir",
+ empty_dir=k8s.V1EmptyDirVolumeSource()
+ ),
+ ]
+ )
+ ),
}
)
+ # [END task_with_sidecar]
# Test that we can add labels to pods
third_task = PythonOperator(
@@ -107,5 +170,6 @@ with DAG(
}
)
- start_task >> second_task >> third_task
+ start_task >> volume_task >> third_task
start_task >> other_ns_task
+ start_task >> sidecar_task
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index 02a31ab..7290aaa 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -27,6 +27,7 @@ import inspect
import os
import re
import uuid
+import warnings
from functools import reduce
from typing import Dict, List, Optional, Union
@@ -324,6 +325,37 @@ class PodGenerator:
@staticmethod
def from_obj(obj) -> Optional[k8s.V1Pod]:
"""Converts to pod from obj"""
+
+ if obj is None:
+ return None
+
+ k8s_legacy_object = obj.get("KubernetesExecutor", None)
+ k8s_object = obj.get("pod_override", None)
+
+ if k8s_legacy_object and k8s_object:
+ raise AirflowConfigException("Can not have both a legacy and new"
+ "executor_config object. Please
delete the KubernetesExecutor"
+ "dict and only use the pod_override
kubernetes.client.models.V1Pod"
+ "object.")
+ if not k8s_object and not k8s_legacy_object:
+ return None
+
+ if isinstance(k8s_object, k8s.V1Pod):
+ return k8s_object
+ elif isinstance(k8s_legacy_object, dict):
+ warnings.warn('Using a dictionary for the executor_config is
deprecated and will soon be removed.'
+ 'please use a `kubernetes.client.models.V1Pod` class
with a "pod_override" key'
+ ' instead. ',
+ category=DeprecationWarning)
+ return PodGenerator.from_legacy_obj(obj)
+ else:
+ raise TypeError(
+ 'Cannot convert a non-kubernetes.client.models.V1Pod'
+ 'object into a KubernetesExecutorConfig')
+
+ @staticmethod
+ def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
+ """Converts to pod from obj"""
if obj is None:
return None
@@ -515,6 +547,16 @@ class PodGenerator:
return reduce(PodGenerator.reconcile_pods, pod_list)
@staticmethod
+ def serialize_pod(pod: k8s.V1Pod):
+ """
+ Converts a k8s.V1Pod into a jsonified object
+ @param pod:
+ @return:
+ """
+ api_client = ApiClient()
+ return api_client.sanitize_for_serialization(pod)
+
+ @staticmethod
def deserialize_model_file(path: str) -> k8s.V1Pod:
"""
:param path: Path to the file
@@ -524,7 +566,6 @@ class PodGenerator:
``_ApiClient__deserialize_model`` from the kubernetes client.
This issue is tracked here;
https://github.com/kubernetes-client/python/issues/977.
"""
- api_client = ApiClient()
if os.path.exists(path):
with open(path) as stream:
pod = yaml.safe_load(stream)
@@ -532,7 +573,18 @@ class PodGenerator:
pod = yaml.safe_load(path)
# pylint: disable=protected-access
- return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+ return PodGenerator.deserialize_model_dict(pod)
+
+ @staticmethod
+ def deserialize_model_dict(pod_dict: dict) -> k8s.V1Pod:
+ """
+ Deserializes python dictionary to k8s.V1Pod
+ @param pod_dict:
+ @return:
+ """
+ api_client = ApiClient()
+ return api_client._ApiClient__deserialize_model( # pylint:
disable=W0212
+ pod_dict, k8s.V1Pod)
@staticmethod
def make_unique_pod_id(dag_id):
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 76f956f..b6fae5e 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -42,3 +42,4 @@ class DagAttributeTypes(str, Enum):
DICT = 'dict'
SET = 'set'
TUPLE = 'tuple'
+ POD = 'k8s.V1Pod'
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 6cbec0d..e15db25 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -25,9 +25,11 @@ from typing import Any, Dict, Iterable, List, Optional, Set,
Union
import cattr
import pendulum
from dateutil import relativedelta
+from kubernetes.client import models as k8s
from pendulum.tz.timezone import Timezone
from airflow.exceptions import AirflowException
+from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.models.connection import Connection
from airflow.models.dag import DAG
@@ -180,6 +182,9 @@ class BaseSerialization:
)
elif isinstance(var, list):
return [cls._serialize(v) for v in var]
+ elif isinstance(var, k8s.V1Pod):
+ json_pod = PodGenerator.serialize_pod(var)
+ return cls._encode(json_pod, type_=DAT.POD)
elif isinstance(var, DAG):
return SerializedDAG.serialize_dag(var)
elif isinstance(var, BaseOperator):
@@ -239,6 +244,9 @@ class BaseSerialization:
return SerializedBaseOperator.deserialize_operator(var)
elif type_ == DAT.DATETIME:
return pendulum.from_timestamp(var)
+ elif type_ == DAT.POD:
+ pod = PodGenerator.deserialize_model_dict(var)
+ return pod
elif type_ == DAT.TIMEDELTA:
return datetime.timedelta(seconds=var)
elif type_ == DAT.TIMEZONE:
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 67fa814..5d2be97 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -156,6 +156,15 @@ Example DAG with functional abstraction
html_content=email_info['body']
)
+.. _concepts:executor_config:
+
+executor_config
+===============
+
+The executor_config is an argument placed into operators that allow airflow
users to override tasks
+before launch. Currently this is primarily used by the
:class:`KubernetesExecutor`, but will soon be available
+for other overrides.
+
.. _concepts:dagruns:
DAG Runs
diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst
index 099f119..c18b048 100644
--- a/docs/executor/kubernetes.rst
+++ b/docs/executor/kubernetes.rst
@@ -41,6 +41,35 @@ The volumes are optional and depend on your configuration.
There are two volumes
To troubleshoot issue with KubernetesExecutor, you can use ``airflow
kubernetes generate-dag-yaml`` command.
This command generates the pods as they will be launched in Kubernetes and
dumps them into yaml files for you to inspect.
+.. _concepts:pod_override:
+
+pod_override
+############
+
+When using the KubernetesExecutor, Airflow offers the ability to override
system defaults on a per-task basis.
+To utilize this functionality, create a Kubernetes V1pod object and fill in
your desired overrides.
+Please note that the scheduler will override the ``metadata.name`` of the
V1pod before launching it.
+
+To overwrite the base container of the pod launched by the KubernetesExecutor,
+create a V1pod with a single container, and overwrite the fields as follows:
+
+.. exampleinclude::
/../airflow/example_dags/example_kubernetes_executor_config.py
+ :language: python
+ :start-after: [START task_with_volume]
+ :end-before: [END task_with_volume]
+
+Note that volume mounts environment variables, ports, and devices will all be
extended instead of overwritten.
+
+To add a sidecar container to the launched pod, create a V1pod with an empty
first container with the
+name ``base`` and a second container containing your desired sidecar.
+
+.. exampleinclude::
/../airflow/example_dags/example_kubernetes_executor_config.py
+ :language: python
+ :start-after: [START task_with_sidecar]
+ :end-before: [END task_with_sidecar]
+
+In the following example, we create a sidecar container that shares a
volume_mount for data sharing.
+
KubernetesExecutor Architecture
################################
diff --git a/tests/kubernetes/test_pod_generator.py
b/tests/kubernetes/test_pod_generator.py
index 0699f0b..812663b 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -270,7 +270,47 @@ class TestPodGenerator(unittest.TestCase):
],
}
})
+
+ result_from_pod = PodGenerator.from_obj(
+ {"pod_override":
+ k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ annotations={"test": "annotation"}
+ ),
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(
+ name="example-kubernetes-test-volume",
+ mount_path="/foo/"
+ )
+ ]
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="example-kubernetes-test-volume",
+ host_path="/tmp/"
+ )
+ ]
+ )
+ )
+ }
+ )
+
result = self.k8s_client.sanitize_for_serialization(result)
+ result_from_pod =
self.k8s_client.sanitize_for_serialization(result_from_pod)
+ expected_from_pod = {'metadata': {'annotations': {'test':
'annotation'}},
+ 'spec': {'containers': [
+ {'name': 'base',
+ 'volumeMounts': [{'mountPath': '/foo/',
+ 'name':
'example-kubernetes-test-volume'}]}],
+ 'volumes': [{'hostPath': '/tmp/',
+ 'name':
'example-kubernetes-test-volume'}]}}
+ self.assertEqual(result_from_pod, expected_from_pod, "There was a
discrepency"
+ " between
KubernetesExecutor and pod_override")
self.assertEqual({
'apiVersion': 'v1',
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 948ca99..75c35e8 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -26,9 +26,11 @@ from glob import glob
from unittest import mock
from dateutil.relativedelta import FR, relativedelta
+from kubernetes.client import models as k8s
from parameterized import parameterized
from airflow.hooks.base_hook import BaseHook
+from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DAG, Connection, DagBag, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.operators.bash import BashOperator
@@ -36,6 +38,20 @@ from airflow.serialization.json_schema import
load_dag_schema_dict
from airflow.serialization.serialized_objects import SerializedBaseOperator,
SerializedDAG
from tests.test_utils.mock_operators import CustomOperator, CustomOpLink,
GoogleLink
+executor_config_pod = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(name="my-name"),
+ spec=k8s.V1PodSpec(containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(
+ name="my-vol",
+ mount_path="/vol/"
+ )
+ ]
+ )
+ ]))
+
serialized_simple_dag_ground_truth = {
"__version": 1,
"dag": {
@@ -70,6 +86,12 @@ serialized_simple_dag_ground_truth = {
"_task_type": "BashOperator",
"_task_module": "airflow.operators.bash",
"pool": "default_pool",
+ "executor_config": {'__type': 'dict',
+ '__var': {"pod_override": {
+ '__type': 'k8s.V1Pod',
+ '__var':
PodGenerator.serialize_pod(executor_config_pod)}
+ }
+ }
},
{
"task_id": "custom_task",
@@ -130,8 +152,9 @@ def make_simple_dag():
}
) as dag:
CustomOperator(task_id='custom_task')
- BashOperator(task_id='bash_task', bash_command='echo {{ task.task_id
}}', owner='airflow')
- return {'simple_dag': dag}
+ BashOperator(task_id='bash_task', bash_command='echo {{ task.task_id
}}', owner='airflow',
+ executor_config={"pod_override": executor_config_pod})
+ return {'simple_dag': dag}
def make_user_defined_macro_filter_dag():