This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 71e2e6fff9c48ae78933957fea682db7a2e3b7ff
Author: Daniel Imberman <[email protected]>
AuthorDate: Thu Nov 5 14:48:05 2020 -0800

    Add ability to specify pod_template_file in executor_config (#11784)
    
    * Add pod_template_override to executor_config
    
    Users will be able to override the base pod_template_file on a per-task
    basis.
    
    * change docstring
    
    * fix doc
    
    * fix static checks
    
    * add description
    
    (cherry picked from commit 68ba54bbd5a275fba1a126f8e67bd69e5cf4b362)
---
 .../example_kubernetes_executor_config.py          |  95 +++++++++++++++---
 airflow/executors/kubernetes_executor.py           |  53 +++++++---
 airflow/kubernetes/pod_generator.py                |  26 ++++-
 .../dags_in_image_template.yaml                    |  84 ++++++++++++++++
 .../dags_in_volume_template.yaml                   |  81 ++++++++++++++++
 .../git_sync_template.yaml                         | 107 +++++++++++++++++++++
 airflow/serialization/enums.py                     |   2 +
 airflow/serialization/serialized_objects.py        |  18 ++++
 chart/requirements.lock                            |   6 +-
 docs/executor/kubernetes.rst                       |  95 ++++++++++++++++++
 tests/executors/test_kubernetes_executor.py        |  26 ++++-
 tests/serialization/test_dag_serialization.py      |   8 ++
 12 files changed, 563 insertions(+), 38 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor_config.py 
b/airflow/example_dags/example_kubernetes_executor_config.py
index 2e4ba00..e3f42d0 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -22,6 +22,8 @@ This is an example dag for using a Kubernetes Executor 
Configuration.
 from __future__ import print_function
 
 import os
+from kubernetes.client import models as k8s
+
 
 from airflow.contrib.example_dags.libs.helper import print_stuff
 from airflow.models import DAG
@@ -40,6 +42,20 @@ with DAG(
     schedule_interval=None
 ) as dag:
 
+    def test_sharedvolume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+        for i in range(5):
+            try:
+                return_code = os.system("cat /shared/test.txt")
+                if return_code != 0:
+                    raise ValueError("Error when checking volume mount. Return 
code {return_code}"
+                                     .format(return_code=return_code))
+            except ValueError as e:
+                if i > 4:
+                    raise e
+
     def test_volume_mount():
         """
         Tests whether the volume has been mounted.
@@ -61,27 +77,74 @@ with DAG(
         }
     )
 
+    # [START task_with_volume]
+
     # You can mount volume or secret to the worker pod
     second_task = PythonOperator(
         task_id="four_task",
         python_callable=test_volume_mount,
         executor_config={
-            "KubernetesExecutor": {
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ]
-            }
-        }
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(mount_path="/foo/", 
name="example-kubernetes-test-volume")
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                        )
+                    ],
+                )
+            ),
+        },
+    )
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    task_with_template = PythonOperator(
+        task_id="task_with_template",
+        python_callable=print_stuff,
+        executor_config={
+            "pod_template_file": 
"/usr/local/airflow/pod_templates/basic_template.yaml",
+            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+        },
+    )
+    # [END task_with_template]
+
+    # [START task_with_sidecar]
+    sidecar_task = PythonOperator(
+        task_id="task_with_sidecar",
+        python_callable=test_sharedvolume_mount,
+        executor_config={
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > 
/shared/test.txt"],
+                            command=["bash", "-cx"],
+                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
+                        ),
+                    ],
+                    volumes=[
+                        k8s.V1Volume(name="shared-empty-dir", 
empty_dir=k8s.V1EmptyDirVolumeSource()),
+                    ],
+                )
+            ),
+        },
     )
+    # [END task_with_sidecar]
 
     # Test that we can add labels to pods
     third_task = PythonOperator(
@@ -111,3 +174,5 @@ with DAG(
 
     start_task >> second_task >> third_task
     start_task >> other_ns_task
+    start_task >> sidecar_task
+    start_task >> task_with_template
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 73dd91e..bdbd1cb 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -30,9 +30,9 @@ from queue import Empty
 
 import kubernetes
 from dateutil import parser
-from kubernetes import watch, client
+from kubernetes import client, watch
 from kubernetes.client.rest import ApiException
-from urllib3.exceptions import HTTPError, ReadTimeoutError
+from urllib3.exceptions import ReadTimeoutError
 
 from airflow import settings
 from airflow.configuration import conf
@@ -427,11 +427,18 @@ class AirflowKubernetesScheduler(LoggingMixin):
         status
         """
         self.log.info('Kubernetes job is %s', str(next_job))
-        key, command, kube_executor_config = next_job
+        key, command, kube_executor_config, pod_template_file = next_job
         dag_id, task_id, execution_date, try_number = key
 
         if command[0:2] != ["airflow", "run"]:
-            raise ValueError('The command must start with ["airflow", "run"].')
+            raise ValueError('The command must start with ["airflow", "tasks", 
"run"].')
+
+        base_worker_pod = get_base_pod_from_template(pod_template_file, 
self.kube_config)
+
+        if not base_worker_pod:
+            raise AirflowException(
+                "could not find a valid worker template yaml at 
{}".format(self.kube_config.pod_template_file)
+            )
 
         pod = PodGenerator.construct_pod(
             namespace=self.namespace,
@@ -662,6 +669,21 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self._manager.shutdown()
 
 
+def get_base_pod_from_template(pod_template_file, kube_config):
+    """
+    Reads either the pod_template_file set in the executor_config or the base 
pod_template_file
+    set in the airflow.cfg to craft a "base pod" that will be used by the 
KubernetesExecutor
+
+    :param pod_template_file: absolute path to a pod_template_file.yaml or None
+    :param kube_config: The KubeConfig class generated by airflow that 
contains all kube metadata
+    :return: a V1Pod that can be used as the base pod for k8s tasks
+    """
+    if pod_template_file:
+        return PodGenerator.deserialize_model_file(pod_template_file)
+    else:
+        return 
PodGenerator.deserialize_model_file(kube_config.pod_template_file)
+
+
 class KubernetesExecutor(BaseExecutor, LoggingMixin):
     """Executor for Kubernetes"""
     def __init__(self):
@@ -794,7 +816,11 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         )
 
         kube_executor_config = PodGenerator.from_obj(executor_config)
-        self.task_queue.put((key, command, kube_executor_config))
+        if executor_config:
+            pod_template_file = executor_config.get("pod_template_override", 
None)
+        else:
+            pod_template_file = None
+        self.task_queue.put((key, command, kube_executor_config, 
pod_template_file))
 
     def sync(self):
         """Synchronize task state."""
@@ -831,13 +857,16 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    self.log.warning('ApiException when attempting to run 
task, re-queueing. '
-                                     'Message: %s', 
json.loads(e.body)['message'])
-                    self.task_queue.put(task)
-                except HTTPError as e:
-                    self.log.warning('HTTPError when attempting to run task, 
re-queueing. '
-                                     'Exception: %s', str(e))
-                    self.task_queue.put(task)
+                    if e.reason == "BadRequest":
+                        self.log.error("Request was invalid. Failing task")
+                        key, _, _, _ = task
+                        self.change_state(key, State.FAILED, e)
+                    else:
+                        self.log.warning(
+                            'ApiException when attempting to run task, 
re-queueing. ' 'Message: %s',
+                            json.loads(e.body)['message'],
+                        )
+                        self.task_queue.put(task)
                 finally:
                     self.task_queue.task_done()
             except Empty:
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index 5a57230..e12fba4 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -564,6 +564,18 @@ class PodGenerator(object):
         return reduce(PodGenerator.reconcile_pods, pod_list)
 
     @staticmethod
+    def serialize_pod(pod):
+        """
+
+        Converts a k8s.V1Pod into a jsonified object
+
+        @param pod:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client.sanitize_for_serialization(pod)
+
+    @staticmethod
     def deserialize_model_file(path):
         """
         :param path: Path to the file
@@ -573,15 +585,23 @@ class PodGenerator(object):
         ``_ApiClient__deserialize_model`` from the kubernetes client.
         This issue is tracked here; 
https://github.com/kubernetes-client/python/issues/977.
         """
-        api_client = ApiClient()
         if os.path.exists(path):
             with open(path) as stream:
                 pod = yaml.safe_load(stream)
         else:
             pod = yaml.safe_load(path)
 
-        # pylint: disable=protected-access
-        return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+        return PodGenerator.deserialize_model_dict(pod)
+
+    @staticmethod
+    def deserialize_model_dict(pod_dict):
+        """
+        Deserializes python dictionary to k8s.V1Pod
+        @param pod_dict:
+        @return:
+        """
+        api_client = ApiClient()
+        return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)  
# pylint: disable=W0212
 
 
 def merge_objects(base_obj, client_obj):
diff --git 
a/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml 
b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
new file mode 100644
index 0000000..b1995c2
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# [START template_with_dags_in_image]
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: false
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+          subPath: repo/tests/dags
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      persistentVolumeClaim:
+        claimName: RELEASE-NAME-dags
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END template_with_dags_in_image]
diff --git 
a/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml 
b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
new file mode 100644
index 0000000..86b5358
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# [START template_with_dags_in_volume]
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+          subPath: repo/tests/dags
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      persistentVolumeClaim:
+        claimName: RELEASE-NAME-dags
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END template_with_dags_in_volume]
diff --git 
a/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml 
b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
new file mode 100644
index 0000000..a962a8f
--- /dev/null
+++ b/airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# [START git_sync_template]
+apiVersion: v1
+kind: Pod
+metadata:
+  name: dummy-name
+spec:
+  initContainers:
+    - name: git-sync
+      image: "k8s.gcr.io/git-sync:v3.1.6"
+      env:
+        - name: GIT_SYNC_REV
+          value: "HEAD"
+        - name: GIT_SYNC_BRANCH
+          value: "v1-10-stable"
+        - name: GIT_SYNC_REPO
+          value: "https://github.com/apache/airflow.git";
+        - name: GIT_SYNC_DEPTH
+          value: "1"
+        - name: GIT_SYNC_ROOT
+          value: "/git"
+        - name: GIT_SYNC_DEST
+          value: "repo"
+        - name: GIT_SYNC_ADD_USER
+          value: "true"
+        - name: GIT_SYNC_WAIT
+          value: "60"
+        - name: GIT_SYNC_MAX_SYNC_FAILURES
+          value: "0"
+      volumeMounts:
+        - name: dags
+          mountPath: /git
+  containers:
+    - args: []
+      command: []
+      env:
+        - name: AIRFLOW__CORE__EXECUTOR
+          value: LocalExecutor
+        # Hard Coded Airflow Envs
+        - name: AIRFLOW__CORE__FERNET_KEY
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-fernet-key
+              key: fernet-key
+        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+        - name: AIRFLOW_CONN_AIRFLOW_DB
+          valueFrom:
+            secretKeyRef:
+              name: RELEASE-NAME-airflow-metadata
+              key: connection
+      envFrom: []
+      image: dummy_image
+      imagePullPolicy: IfNotPresent
+      name: base
+      ports: []
+      volumeMounts:
+        - mountPath: "/opt/airflow/logs"
+          name: airflow-logs
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: false
+        - mountPath: /opt/airflow/dags
+          name: airflow-dags
+          readOnly: true
+  hostNetwork: false
+  restartPolicy: Never
+  securityContext:
+    runAsUser: 50000
+  nodeSelector:
+    {}
+  affinity:
+    {}
+  tolerations:
+    []
+  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
+  volumes:
+    - name: dags
+      emptyDir: {}
+    - emptyDir: {}
+      name: airflow-logs
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-config
+    - configMap:
+        name: RELEASE-NAME-airflow-config
+      name: airflow-local-settings
+# [END git_sync_template]
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 8e5fee6..e4f72c5 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -36,10 +36,12 @@ class DagAttributeTypes(str, Enum):
     """Enum of supported attribute types of DAG."""
     DAG = 'dag'
     OP = 'operator'
+
     DATETIME = 'datetime'
     TIMEDELTA = 'timedelta'
     TIMEZONE = 'timezone'
     RELATIVEDELTA = 'relativedelta'
     DICT = 'dict'
+    POD = 'k8s.V1Pod'
     SET = 'set'
     TUPLE = 'tuple'
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index c527ddf..857514e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -47,6 +47,16 @@ except ImportError:
 if TYPE_CHECKING:
     from inspect import Parameter
 
+try:
+    # isort: off
+    from kubernetes.client import models as k8s
+    from airflow.kubernetes.pod_generator import PodGenerator
+
+    # isort: on
+    HAS_KUBERNETES = True
+except ImportError:
+    HAS_KUBERNETES = False
+
 log = logging.getLogger(__name__)
 
 
@@ -199,6 +209,9 @@ class BaseSerialization:
                 return [cls._serialize(v) for v in var]
             elif isinstance(var, DAG):
                 return SerializedDAG.serialize_dag(var)
+            elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod):
+                json_pod = PodGenerator.serialize_pod(var)
+                return cls._encode(json_pod, type_=DAT.POD)
             elif isinstance(var, BaseOperator):
                 return SerializedBaseOperator.serialize_operator(var)
             elif isinstance(var, cls._datetime_types):
@@ -253,6 +266,11 @@ class BaseSerialization:
             return SerializedBaseOperator.deserialize_operator(var)
         elif type_ == DAT.DATETIME:
             return pendulum.from_timestamp(var)
+        elif type_ == DAT.POD:
+            if not HAS_KUBERNETES:
+                raise RuntimeError("Cannot deserialize POD objects without 
kubernetes libraries installed!")
+            pod = PodGenerator.deserialize_model_dict(var)
+            return pod
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
diff --git a/chart/requirements.lock b/chart/requirements.lock
index e460e9f..eb62c80 100644
--- a/chart/requirements.lock
+++ b/chart/requirements.lock
@@ -1,6 +1,6 @@
 dependencies:
 - name: postgresql
-  repository: https://charts.helm.sh/stable/
+  repository: https://kubernetes-charts.storage.googleapis.com
   version: 6.3.12
-digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98
-generated: "2020-11-07T17:40:45.418723358+01:00"
+digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af
+generated: "2020-11-04T15:59:36.967913-08:00"
diff --git a/docs/executor/kubernetes.rst b/docs/executor/kubernetes.rst
index 042d638..ed172a8 100644
--- a/docs/executor/kubernetes.rst
+++ b/docs/executor/kubernetes.rst
@@ -34,6 +34,101 @@ The kubernetes executor is introduced in Apache Airflow 
1.10.0. The Kubernetes e
 
   - Another option is to use S3/GCS/etc to store logs
 
+To troubleshoot issue with KubernetesExecutor, you can use ``airflow 
kubernetes generate-dag-yaml`` command.
+This command generates the pods as they will be launched in Kubernetes and 
dumps them into yaml files for you to inspect.
+
+.. _concepts:pod_template_file:
+
+pod_template_file
+#################
+
+As of Airflow 1.10.12, you can now use the ``pod_template_file`` option in the 
``kubernetes`` section
+of the ``airflow.cfg`` file to form the basis of your KubernetesExecutor pods. 
This process is faster to execute
+and easier to modify.
+
+We include multiple examples of working pod operators below, but we would also 
like to explain a few necessary components
+if you want to customize your template files. As long as you have these 
components, every other element
+in the template is customizable.
+
+1. Airflow will overwrite the base container image and the pod name
+
+There are two points where Airflow potentially overwrites the base image: in 
the ``airflow.cfg``
+or the ``pod_override`` (discussed below) setting. This value is overwritten 
to ensure that users do
+not need to update multiple template files every time they upgrade their 
docker image. The other field
+that Airflow overwrites is the ``pod.metadata.name`` field. This field has to 
be unique across all pods,
+so we generate these names dynamically before launch.
+
+It's important to note while Airflow overwrites these fields, they **can not 
be left blank**.
+If these fields do not exist, kubernetes can not load the yaml into a 
Kubernetes V1Pod.
+
+2. Each Airflow ``pod_template_file`` must have a container named "base" at 
the ``pod.spec.containers[0]`` position
+
+Airflow uses the ``pod_template_file`` by making certain assumptions about the 
structure of the template.
+When airflow creates the worker pod's command, it assumes that the airflow 
worker container part exists
+at the beginning of the container array. It then assumes that the container is 
named ``base``
+when it merges this pod with internal configs. You are more than welcome to 
create
+sidecar containers after this required container.
+
+With these requirements in mind, here are some examples of basic 
``pod_template_file`` YAML files.
+
+pod_template_file using the ``dag_in_image`` setting:
+
+.. exampleinclude:: 
/../airflow/kubernetes/pod_template_file_examples/dags_in_image_template.yaml
+    :language: yaml
+    :start-after: [START template_with_dags_in_image]
+    :end-before: [END template_with_dags_in_image]
+
+``pod_template_file`` which stores DAGs in a ``persistentVolume``:
+
+.. exampleinclude:: 
/../airflow/kubernetes/pod_template_file_examples/dags_in_volume_template.yaml
+    :language: yaml
+    :start-after: [START template_with_dags_in_volume]
+    :end-before: [END template_with_dags_in_volume]
+
+``pod_template_file`` which pulls DAGs from git:
+
+.. exampleinclude:: 
/../airflow/kubernetes/pod_template_file_examples/git_sync_template.yaml
+    :language: yaml
+    :start-after:  [START git_sync_template]
+    :end-before: [END git_sync_template]
+
+.. _concepts:pod_override:
+
+pod_override
+############
+
+When using the KubernetesExecutor, Airflow offers the ability to override 
system defaults on a per-task basis.
+To utilize this functionality, create a Kubernetes V1pod object and fill in 
your desired overrides.
+Please note that the scheduler will override the ``metadata.name`` of the 
V1pod before launching it.
+
+To overwrite the base container of the pod launched by the KubernetesExecutor,
+create a V1pod with a single container, and overwrite the fields as follows:
+
+.. exampleinclude:: 
/../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_volume]
+    :end-before: [END task_with_volume]
+
+Note that volume mounts environment variables, ports, and devices will all be 
extended instead of overwritten.
+
+To add a sidecar container to the launched pod, create a V1pod with an empty 
first container with the
+name ``base`` and a second container containing your desired sidecar.
+
+.. exampleinclude:: 
/../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_sidecar]
+    :end-before: [END task_with_sidecar]
+
+You can also create custom ``pod_template_file`` on a per-task basis so that 
you can recycle the same base values between multiple tasks.
+This will replace the default ``pod_template_file`` named in the airflow.cfg 
and then override that template using the ``pod_override_spec``.
+
+Here is an example of a task with both features:
+
+.. exampleinclude:: 
/../airflow/example_dags/example_kubernetes_executor_config.py
+    :language: python
+    :start-after: [START task_with_template]
+    :end-before: [END task_with_template]
+
 KubernetesExecutor Architecture
 ################################
 
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 3dabb78..f5f415a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -29,12 +29,15 @@ from tests.compat import mock
 from tests.test_utils.config import conf_vars
 try:
     from kubernetes.client.rest import ApiException
-    from airflow import configuration  # noqa: F401
-    from airflow.configuration import conf  # noqa: F401
-    from airflow.executors.kubernetes_executor import 
AirflowKubernetesScheduler, KubeConfig
-    from airflow.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.kubernetes.pod_generator import PodGenerator
+
+    from airflow.executors.kubernetes_executor import (
+        AirflowKubernetesScheduler,
+        KubeConfig,
+        KubernetesExecutor,
+        get_base_pod_from_template,
+    )
     from airflow.kubernetes import pod_generator
+    from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -94,6 +97,19 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
             )
             self.assertTrue(self._is_valid_pod_id(pod_name))
 
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @mock.patch("airflow.kubernetes.pod_generator.PodGenerator")
+    @mock.patch("airflow.executors.kubernetes_executor.KubeConfig")
+    def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator):
+        pod_template_file_path = "/bar/biz"
+        get_base_pod_from_template(pod_template_file_path, None)
+        self.assertEqual("deserialize_model_dict", 
mock_generator.mock_calls[0][0])
+        self.assertEqual(pod_template_file_path, 
mock_generator.mock_calls[0][1][0])
+        mock_kubeconfig.pod_template_file = "/foo/bar"
+        get_base_pod_from_template(None, mock_kubeconfig)
+        self.assertEqual("deserialize_model_dict", 
mock_generator.mock_calls[1][0])
+        self.assertEqual("/foo/bar", mock_generator.mock_calls[1][1][0])
+
     def test_make_safe_label_value(self):
         for dag_id, task_id in self._cases():
             safe_dag_id = pod_generator.make_safe_label_value(dag_id)
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index d999cb0..30493fd 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -259,6 +259,14 @@ class TestStringifiedDAGs(unittest.TestCase):
 
         assert sorted_serialized_dag(ground_truth_dag) == 
sorted_serialized_dag(json_dag)
 
+    def test_deser_k8s_pod_override(self):
+        dag = 
collect_dags('airflow/example_dags')['example_kubernetes_executor_config']
+        serialized = SerializedDAG.to_json(dag)
+        deser_dag = SerializedDAG.from_json(serialized)
+        p1 = dag.tasks[1].executor_config
+        p2 = deser_dag.tasks[1].executor_config
+        self.assertDictEqual(p1['pod_override'].to_dict(), 
p2['pod_override'].to_dict())
+
     def test_deserialization_across_process(self):
         """A serialized DAG can be deserialized in another process."""
 

Reply via email to