This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 49d733223fe35016c421f7fc0ba41215427add68 Author: Daniel Imberman <[email protected]> AuthorDate: Wed Nov 18 20:25:32 2020 -0800 Change back example_kubernetes_executor_config to KubernetesExecutor Fixes tests by removing pod_override as we are going to have users wait until Airflow 2.0 to use the pod_override feature --- .../example_kubernetes_executor_config.py | 96 ++++------------------ chart/requirements.lock | 6 +- docs/executor/kubernetes.rst | 37 --------- tests/serialization/test_dag_serialization.py | 8 -- 4 files changed, 19 insertions(+), 128 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py index e3f42d0..e06e8ab 100644 --- a/airflow/example_dags/example_kubernetes_executor_config.py +++ b/airflow/example_dags/example_kubernetes_executor_config.py @@ -1,3 +1,4 @@ + # -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one @@ -22,8 +23,6 @@ This is an example dag for using a Kubernetes Executor Configuration. from __future__ import print_function import os -from kubernetes.client import models as k8s - from airflow.contrib.example_dags.libs.helper import print_stuff from airflow.models import DAG @@ -42,20 +41,6 @@ with DAG( schedule_interval=None ) as dag: - def test_sharedvolume_mount(): - """ - Tests whether the volume has been mounted. - """ - for i in range(5): - try: - return_code = os.system("cat /shared/test.txt") - if return_code != 0: - raise ValueError("Error when checking volume mount. Return code {return_code}" - .format(return_code=return_code)) - except ValueError as e: - if i > 4: - raise e - def test_volume_mount(): """ Tests whether the volume has been mounted. @@ -77,74 +62,27 @@ with DAG( } ) - # [START task_with_volume] - # You can mount volume or secret to the worker pod second_task = PythonOperator( task_id="four_task", python_callable=test_volume_mount, executor_config={ - "pod_override": k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume") - ], - ) - ], - volumes=[ - k8s.V1Volume( - name="example-kubernetes-test-volume", - host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), - ) - ], - ) - ), - }, - ) - # [END task_with_volume] - - # [START task_with_template] - task_with_template = PythonOperator( - task_id="task_with_template", - python_callable=print_stuff, - executor_config={ - "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml", - "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})), - }, - ) - # [END task_with_template] - - # [START task_with_sidecar] - sidecar_task = PythonOperator( - task_id="task_with_sidecar", - python_callable=test_sharedvolume_mount, - executor_config={ - "pod_override": k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], - ), - k8s.V1Container( - name="sidecar", - image="ubuntu", - args=["echo \"retrieved from mount\" > /shared/test.txt"], - command=["bash", "-cx"], - volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], - ), - ], - volumes=[ - k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()), - ], - ) - ), - }, + "KubernetesExecutor": { + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ] + } + } ) - # [END task_with_sidecar] # Test that we can add labels to pods third_task = PythonOperator( @@ -174,5 +112,3 @@ with DAG( start_task >> second_task >> third_task start_task >> other_ns_task - start_task >> sidecar_task - start_task >> task_with_template diff --git a/chart/requirements.lock b/chart/requirements.lock index eb62c80..4999d6b 100644 --- a/chart/requirements.lock +++ b/chart/requirements.lock @@ -1,6 +1,6 @@ dependencies: - name: postgresql - repository: https://kubernetes-charts.storage.googleapis.com + repository: https://charts.helm.sh/stable/ version: 6.3.12 -digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af -generated: "2020-11-04T15:59:36.967913-08:00" +digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98 +generated: "2020-11-18T20:19:23.9885-08:00" diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst index ed172a8..3eabb51 100644 --- a/docs/executor/kubernetes.rst +++ b/docs/executor/kubernetes.rst @@ -92,43 +92,6 @@ pod_template_file using the ``dag_in_image`` setting: :start-after: [START git_sync_template] :end-before: [END git_sync_template] -.. _concepts:pod_override: - -pod_override -############ - -When using the KubernetesExecutor, Airflow offers the ability to override system defaults on a per-task basis. -To utilize this functionality, create a Kubernetes V1pod object and fill in your desired overrides. -Please note that the scheduler will override the ``metadata.name`` of the V1pod before launching it. - -To overwrite the base container of the pod launched by the KubernetesExecutor, -create a V1pod with a single container, and overwrite the fields as follows: - -.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py - :language: python - :start-after: [START task_with_volume] - :end-before: [END task_with_volume] - -Note that volume mounts environment variables, ports, and devices will all be extended instead of overwritten. - -To add a sidecar container to the launched pod, create a V1pod with an empty first container with the -name ``base`` and a second container containing your desired sidecar. - -.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py - :language: python - :start-after: [START task_with_sidecar] - :end-before: [END task_with_sidecar] - -You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks. -This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``. - -Here is an example of a task with both features: - -.. exampleinclude:: /../airflow/example_dags/example_kubernetes_executor_config.py - :language: python - :start-after: [START task_with_template] - :end-before: [END task_with_template] - KubernetesExecutor Architecture ################################ diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 30493fd..d999cb0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -259,14 +259,6 @@ class TestStringifiedDAGs(unittest.TestCase): assert sorted_serialized_dag(ground_truth_dag) == sorted_serialized_dag(json_dag) - def test_deser_k8s_pod_override(self): - dag = collect_dags('airflow/example_dags')['example_kubernetes_executor_config'] - serialized = SerializedDAG.to_json(dag) - deser_dag = SerializedDAG.from_json(serialized) - p1 = dag.tasks[1].executor_config - p2 = deser_dag.tasks[1].executor_config - self.assertDictEqual(p1['pod_override'].to_dict(), p2['pod_override'].to_dict()) - def test_deserialization_across_process(self): """A serialized DAG can be deserialized in another process."""
