This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 20481c3  Add pod_override setting for KubernetesExecutor (#10756)
20481c3 is described below

commit 20481c3cafb89bcf9b8a1b3a5f7c5470b11838c3
Author: Daniel Imberman <[email protected]>
AuthorDate: Tue Sep 8 15:56:59 2020 -0700

    Add pod_override setting for KubernetesExecutor (#10756)
    
    * Add podOverride setting for KubernetesExecutor
    
    Users of the KubernetesExecutor will now have a "podOverride"
    option in the executor_config. This option will allow users to
    modify the pod launched by the KubernetesExecutor using a
    `kubernetes.client.models.V1Pod` class. This is the first step
    in deprecating the tradition executor_config.
    
    * Fix k8s tests
    
    * fix docs
---
 .../example_kubernetes_executor_config.py          | 100 +++++++++++++++++----
 airflow/kubernetes/pod_generator.py                |  56 +++++++++++-
 airflow/serialization/enums.py                     |   1 +
 airflow/serialization/serialized_objects.py        |   8 ++
 docs/concepts.rst                                  |   9 ++
 docs/executor/kubernetes.rst                       |  29 ++++++
 tests/kubernetes/test_pod_generator.py             |  40 +++++++++
 tests/serialization/test_dag_serialization.py      |  27 +++++-
 8 files changed, 248 insertions(+), 22 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor_config.py 
b/airflow/example_dags/example_kubernetes_executor_config.py
index 5fef135..db64b31 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -20,6 +20,8 @@ This is an example dag for using a Kubernetes Executor 
Configuration.
 """
 import os
 
+from kubernetes.client import models as k8s
+
 from airflow import DAG
 from airflow.example_dags.libs.helper import print_stuff
 from airflow.operators.python import PythonOperator
@@ -37,6 +39,19 @@ with DAG(
     tags=['example'],
 ) as dag:
 
+    def test_sharedvolume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+        for i in range(5):
+            try:
+                return_code = os.system("cat /shared/test.txt")
+                if return_code != 0:
+                    raise ValueError(f"Error when checking volume mount. 
Return code {return_code}")
+            except ValueError as e:
+                if i > 4:
+                    raise e
+
     def test_volume_mount():
         """
         Tests whether the volume has been mounted.
@@ -59,27 +74,75 @@ with DAG(
         }
     )
 
-    # You can mount volume or secret to the worker pod
-    second_task = PythonOperator(
-        task_id="four_task",
+    # [START task_with_volume]
+    volume_task = PythonOperator(
+        task_id="task_with_volume",
         python_callable=test_volume_mount,
         executor_config={
-            "KubernetesExecutor": {
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ]
-            }
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(
+                                    mount_path="/foo/",
+                                    name="example-kubernetes-test-volume"
+                                )
+                            ]
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(
+                                path="/tmp/"
+                            )
+                        )
+                    ]
+                )
+            ),
+        }
+    )
+    # [END task_with_volume]
+
+    # [START task_with_sidecar]
+    sidecar_task = PythonOperator(
+        task_id="task_with_sidecar",
+        python_callable=test_sharedvolume_mount,
+        executor_config={
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[k8s.V1VolumeMount(
+                                mount_path="/shared/",
+                                name="shared-empty-dir"
+                            )]
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > 
/shared/test.txt"],
+                            command=["bash", "-cx"],
+                            volume_mounts=[k8s.V1VolumeMount(
+                                mount_path="/shared/",
+                                name="shared-empty-dir"
+                            )]
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="shared-empty-dir",
+                            empty_dir=k8s.V1EmptyDirVolumeSource()
+                        ),
+                    ]
+                )
+            ),
         }
     )
+    # [END task_with_sidecar]
 
     # Test that we can add labels to pods
     third_task = PythonOperator(
@@ -107,5 +170,6 @@ with DAG(
         }
     )
 
-    start_task >> second_task >> third_task
+    start_task >> volume_task >> third_task
     start_task >> other_ns_task
+    start_task >> sidecar_task
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index 02a31ab..7290aaa 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -27,6 +27,7 @@ import inspect
 import os
 import re
 import uuid
+import warnings
 from functools import reduce
 from typing import Dict, List, Optional, Union
 
@@ -324,6 +325,37 @@ class PodGenerator:
     @staticmethod
     def from_obj(obj) -> Optional[k8s.V1Pod]:
         """Converts to pod from obj"""
+
+        if obj is None:
+            return None
+
+        k8s_legacy_object = obj.get("KubernetesExecutor", None)
+        k8s_object = obj.get("pod_override", None)
+
+        if k8s_legacy_object and k8s_object:
+            raise AirflowConfigException("Can not have both a legacy and new"
+                                         "executor_config object. Please 
delete the KubernetesExecutor"
+                                         "dict and only use the pod_override 
kubernetes.client.models.V1Pod"
+                                         "object.")
+        if not k8s_object and not k8s_legacy_object:
+            return None
+
+        if isinstance(k8s_object, k8s.V1Pod):
+            return k8s_object
+        elif isinstance(k8s_legacy_object, dict):
+            warnings.warn('Using a dictionary for the executor_config is 
deprecated and will soon be removed.'
+                          'please use a `kubernetes.client.models.V1Pod` class 
with a "pod_override" key'
+                          ' instead. ',
+                          category=DeprecationWarning)
+            return PodGenerator.from_legacy_obj(obj)
+        else:
+            raise TypeError(
+                'Cannot convert a non-kubernetes.client.models.V1Pod'
+                'object into a KubernetesExecutorConfig')
+
+    @staticmethod
+    def from_legacy_obj(obj) -> Optional[k8s.V1Pod]:
+        """Converts to pod from obj"""
         if obj is None:
             return None
 
@@ -515,6 +547,16 @@ class PodGenerator:
         return reduce(PodGenerator.reconcile_pods, pod_list)
 
     @staticmethod
+    def serialize_pod(pod: k8s.V1Pod):
+        """
+        Converts a k8s.V1Pod into a jsonified object
+        @param pod:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client.sanitize_for_serialization(pod)
+
+    @staticmethod
     def deserialize_model_file(path: str) -> k8s.V1Pod:
         """
         :param path: Path to the file
@@ -524,7 +566,6 @@ class PodGenerator:
         ``_ApiClient__deserialize_model`` from the kubernetes client.
         This issue is tracked here; 
https://github.com/kubernetes-client/python/issues/977.
         """
-        api_client = ApiClient()
         if os.path.exists(path):
             with open(path) as stream:
                 pod = yaml.safe_load(stream)
@@ -532,7 +573,18 @@ class PodGenerator:
             pod = yaml.safe_load(path)
 
         # pylint: disable=protected-access
-        return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+        return PodGenerator.deserialize_model_dict(pod)
+
+    @staticmethod
+    def deserialize_model_dict(pod_dict: dict) -> k8s.V1Pod:
+        """
+        Deserializes python dictionary to k8s.V1Pod
+        @param pod_dict:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client._ApiClient__deserialize_model(  # pylint: 
disable=W0212
+            pod_dict, k8s.V1Pod)
 
     @staticmethod
     def make_unique_pod_id(dag_id):
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 76f956f..b6fae5e 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -42,3 +42,4 @@ class DagAttributeTypes(str, Enum):
     DICT = 'dict'
     SET = 'set'
     TUPLE = 'tuple'
+    POD = 'k8s.V1Pod'
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 6cbec0d..e15db25 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -25,9 +25,11 @@ from typing import Any, Dict, Iterable, List, Optional, Set, 
Union
 import cattr
 import pendulum
 from dateutil import relativedelta
+from kubernetes.client import models as k8s
 from pendulum.tz.timezone import Timezone
 
 from airflow.exceptions import AirflowException
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
 from airflow.models.connection import Connection
 from airflow.models.dag import DAG
@@ -180,6 +182,9 @@ class BaseSerialization:
                 )
             elif isinstance(var, list):
                 return [cls._serialize(v) for v in var]
+            elif isinstance(var, k8s.V1Pod):
+                json_pod = PodGenerator.serialize_pod(var)
+                return cls._encode(json_pod, type_=DAT.POD)
             elif isinstance(var, DAG):
                 return SerializedDAG.serialize_dag(var)
             elif isinstance(var, BaseOperator):
@@ -239,6 +244,9 @@ class BaseSerialization:
             return SerializedBaseOperator.deserialize_operator(var)
         elif type_ == DAT.DATETIME:
             return pendulum.from_timestamp(var)
+        elif type_ == DAT.POD:
+            pod = PodGenerator.deserialize_model_dict(var)
+            return pod
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 67fa814..5d2be97 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -156,6 +156,15 @@ Example DAG with functional abstraction
         html_content=email_info['body']
     )
 
+.. _concepts:executor_config:
+
+executor_config
+===============
+
+The executor_config is an argument placed into operators that allow airflow 
users to override tasks
+before launch. Currently this is primarily used by the 
:class:`KubernetesExecutor`, but will soon be available
+for other overrides.
+
 .. _concepts:dagruns:
 
 DAG Runs
diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst
index 099f119..c18b048 100644
--- a/docs/executor/kubernetes.rst
+++ b/docs/executor/kubernetes.rst
@@ -41,6 +41,35 @@ The volumes are optional and depend on your configuration. 
There are two volumes
 To troubleshoot issue with KubernetesExecutor, you can use ``airflow 
kubernetes generate-dag-yaml`` command.
 This command generates the pods as they will be launched in Kubernetes and 
dumps them into yaml files for you to inspect.
 
+.. _concepts:pod_override:
+
+pod_override
+############
+
+When using the KubernetesExecutor, Airflow offers the ability to override 
system defaults on a per-task basis.
+To utilize this functionality, create a Kubernetes V1pod object and fill in 
your desired overrides.
+Please note that the scheduler will override the ``metadata.name`` of the 
V1pod before launching it.
+
+To overwrite the base container of the pod launched by the KubernetesExecutor,
+create a V1pod with a single container, and overwrite the fields as follows:
+
+.. exampleinclude:: 
/../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_volume]
+    :end-before: [END task_with_volume]
+
+Note that volume mounts environment variables, ports, and devices will all be 
extended instead of overwritten.
+
+To add a sidecar container to the launched pod, create a V1pod with an empty 
first container with the
+name ``base`` and a second container containing your desired sidecar.
+
+.. exampleinclude:: 
/../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_sidecar]
+    :end-before: [END task_with_sidecar]
+
+In the following example, we create a sidecar container that shares a 
volume_mount for data sharing.
+
 KubernetesExecutor Architecture
 ################################
 
diff --git a/tests/kubernetes/test_pod_generator.py 
b/tests/kubernetes/test_pod_generator.py
index 0699f0b..812663b 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -270,7 +270,47 @@ class TestPodGenerator(unittest.TestCase):
                 ],
             }
         })
+
+        result_from_pod = PodGenerator.from_obj(
+            {"pod_override":
+                k8s.V1Pod(
+                    metadata=k8s.V1ObjectMeta(
+                        annotations={"test": "annotation"}
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                volume_mounts=[
+                                    k8s.V1VolumeMount(
+                                        name="example-kubernetes-test-volume",
+                                        mount_path="/foo/"
+                                    )
+                                ]
+                            )
+                        ],
+                        volumes=[
+                            k8s.V1Volume(
+                                name="example-kubernetes-test-volume",
+                                host_path="/tmp/"
+                            )
+                        ]
+                    )
+                )
+             }
+        )
+
         result = self.k8s_client.sanitize_for_serialization(result)
+        result_from_pod = 
self.k8s_client.sanitize_for_serialization(result_from_pod)
+        expected_from_pod = {'metadata': {'annotations': {'test': 
'annotation'}},
+                             'spec': {'containers': [
+                                 {'name': 'base',
+                                  'volumeMounts': [{'mountPath': '/foo/',
+                                                    'name': 
'example-kubernetes-test-volume'}]}],
+                                 'volumes': [{'hostPath': '/tmp/',
+                                              'name': 
'example-kubernetes-test-volume'}]}}
+        self.assertEqual(result_from_pod, expected_from_pod, "There was a 
discrepency"
+                                                             " between 
KubernetesExecutor and pod_override")
 
         self.assertEqual({
             'apiVersion': 'v1',
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 948ca99..75c35e8 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -26,9 +26,11 @@ from glob import glob
 from unittest import mock
 
 from dateutil.relativedelta import FR, relativedelta
+from kubernetes.client import models as k8s
 from parameterized import parameterized
 
 from airflow.hooks.base_hook import BaseHook
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DAG, Connection, DagBag, TaskInstance
 from airflow.models.baseoperator import BaseOperator
 from airflow.operators.bash import BashOperator
@@ -36,6 +38,20 @@ from airflow.serialization.json_schema import 
load_dag_schema_dict
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, 
GoogleLink
 
+executor_config_pod = k8s.V1Pod(
+    metadata=k8s.V1ObjectMeta(name="my-name"),
+    spec=k8s.V1PodSpec(containers=[
+        k8s.V1Container(
+            name="base",
+            volume_mounts=[
+                k8s.V1VolumeMount(
+                    name="my-vol",
+                    mount_path="/vol/"
+                )
+            ]
+        )
+    ]))
+
 serialized_simple_dag_ground_truth = {
     "__version": 1,
     "dag": {
@@ -70,6 +86,12 @@ serialized_simple_dag_ground_truth = {
                 "_task_type": "BashOperator",
                 "_task_module": "airflow.operators.bash",
                 "pool": "default_pool",
+                "executor_config": {'__type': 'dict',
+                                    '__var': {"pod_override": {
+                                        '__type': 'k8s.V1Pod',
+                                        '__var': 
PodGenerator.serialize_pod(executor_config_pod)}
+                                    }
+                                    }
             },
             {
                 "task_id": "custom_task",
@@ -130,8 +152,9 @@ def make_simple_dag():
         }
     ) as dag:
         CustomOperator(task_id='custom_task')
-        BashOperator(task_id='bash_task', bash_command='echo {{ task.task_id 
}}', owner='airflow')
-    return {'simple_dag': dag}
+        BashOperator(task_id='bash_task', bash_command='echo {{ task.task_id 
}}', owner='airflow',
+                     executor_config={"pod_override": executor_config_pod})
+        return {'simple_dag': dag}
 
 
 def make_user_defined_macro_filter_dag():

Reply via email to