This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e362c1  K8s yaml templates not rendered by k8sexecutor (#12303)
4e362c1 is described below

commit 4e362c134702dca30d1bd2e3da38bc2ee39825bf
Author: Daniel Imberman <[email protected]>
AuthorDate: Fri Nov 13 12:06:29 2020 -0800

    K8s yaml templates not rendered by k8sexecutor (#12303)
    
    * K8s yaml templates not rendered by k8sexecutor
    
    There is a bug in the yaml template rendering caused by the logic that
    yaml templates are only generated when the current executor is the
    k8sexecutor. This is a problem as the templates are generated by the
    task pod, which is itself running a LocalExecutor. Also generates a
    "base" template if this taskInstance has not run yet.
    
    * fix tests
    
    * fix taskinstance test
    
    * fix taskinstance
    
    * fix pod generator tests
    
    * fix podgen
    
    * Update tests/kubernetes/test_pod_generator.py
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
    
    * @ashb comment
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 airflow/kubernetes/pod_generator.py    | 1 +
 airflow/models/renderedtifields.py     | 5 +++--
 airflow/models/taskinstance.py         | 2 +-
 scripts/ci/libraries/_kind.sh          | 2 --
 tests/kubernetes/test_pod_generator.py | 9 +++++++++
 tests/models/test_renderedtifields.py  | 4 +++-
 tests/models/test_taskinstance.py      | 3 ++-
 7 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index 0735948..7f0a52e 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -396,6 +396,7 @@ class PodGenerator:
                         name="base",
                         command=command,
                         image=image,
+                        env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", 
value="True")],
                     )
                 ]
             ),
diff --git a/airflow/models/renderedtifields.py 
b/airflow/models/renderedtifields.py
index 850badf..91ec027 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Save Rendered Template Fields"""
+import os
 from typing import Optional
 
 import sqlalchemy_jsonfield
@@ -26,7 +27,7 @@ from airflow.configuration import conf
 from airflow.models.base import ID_LEN, Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.serialization.helpers import serialize_template_field
-from airflow.settings import IS_K8S_OR_K8SCELERY_EXECUTOR, json
+from airflow.settings import json
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
@@ -50,7 +51,7 @@ class RenderedTaskInstanceFields(Base):
         self.ti = ti
         if render_templates:
             ti.render_templates()
-        if IS_K8S_OR_K8SCELERY_EXECUTOR:
+        if os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", None):
             self.k8s_pod_yaml = ti.render_k8s_pod_yaml()
         self.rendered_fields = {
             field: serialize_template_field(getattr(self.task, field)) for 
field in self.task.template_fields
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3ae37c1..c6a1378 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1671,7 +1671,7 @@ class TaskInstance(Base, LoggingMixin):  # pylint: 
disable=R0902,R0904
         rendered_k8s_spec = RenderedTaskInstanceFields.get_k8s_pod_yaml(self)
         if not rendered_k8s_spec:
             try:
-                self.render_k8s_pod_yaml()
+                rendered_k8s_spec = self.render_k8s_pod_yaml()
             except (TemplateAssertionError, UndefinedError) as e:
                 raise AirflowException(f"Unable to render a k8s spec for this 
taskinstance: {e}") from e
         return rendered_k8s_spec
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index dcbf6a8..3540064 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -19,10 +19,8 @@
 function kind::get_kind_cluster_name() {
     # Name of the KinD cluster to connect to
     export 
KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
-    readonly KIND_CLUSTER_NAME
     # Name of the KinD cluster to connect to when referred to via kubectl
     export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME}
-    readonly KUBECTL_CLUSTER_NAME
     export KUBECONFIG="${BUILD_CACHE_DIR}/.kube/config"
     mkdir -pv "${BUILD_CACHE_DIR}/.kube/"
     touch "${KUBECONFIG}"
diff --git a/tests/kubernetes/test_pod_generator.py 
b/tests/kubernetes/test_pod_generator.py
index 2aa6363..32e5d06 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -440,6 +440,12 @@ class TestPodGenerator(unittest.TestCase):
         expected.spec.containers[0].command = ['command']
         expected.spec.containers[0].image = 'airflow_image'
         expected.spec.containers[0].resources = {'limits': {'cpu': '1m', 
'memory': '1G'}}
+        expected.spec.containers[0].env.append(
+            k8s.V1EnvVar(
+                name="AIRFLOW_IS_K8S_EXECUTOR_POD",
+                value='True',
+            )
+        )
         result_dict = self.k8s_client.sanitize_for_serialization(result)
         expected_dict = 
self.k8s_client.sanitize_for_serialization(self.expected)
 
@@ -473,6 +479,9 @@ class TestPodGenerator(unittest.TestCase):
         worker_config.metadata.labels['app'] = 'myapp'
         worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex
         worker_config.metadata.namespace = 'namespace'
+        worker_config.spec.containers[0].env.append(
+            k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')
+        )
         worker_config_result = 
self.k8s_client.sanitize_for_serialization(worker_config)
         self.assertEqual(worker_config_result, sanitized_result)
 
diff --git a/tests/models/test_renderedtifields.py 
b/tests/models/test_renderedtifields.py
index bbf1301..297b7c4 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -18,6 +18,7 @@
 
 """Unit tests for RenderedTaskInstanceFields."""
 
+import os
 import unittest
 from datetime import date, timedelta
 from unittest import mock
@@ -227,7 +228,7 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
             ('test_write', 'test', {'bash_command': 'echo test_val_updated', 
'env': None}), result_updated
         )
 
-    
@mock.patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR", 
new=True)
+    @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
     @mock.patch("airflow.settings.pod_mutation_hook")
     def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook):
         """
@@ -281,6 +282,7 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
                         ],
                         'image': ':',
                         'name': 'base',
+                        'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD', 
'value': 'True'}],
                     }
                 ]
             },
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index ac74d89..a3aa445 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1814,7 +1814,7 @@ class TestTaskInstance(unittest.TestCase):
         with create_session() as session:
             session.query(RenderedTaskInstanceFields).delete()
 
-    @patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR", 
new=True)
+    @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
     def test_get_rendered_k8s_spec(self):
         with DAG('test_get_rendered_k8s_spec', start_date=DEFAULT_DATE):
             task = BashOperator(task_id='op1', bash_command="{{ task.task_id 
}}")
@@ -1854,6 +1854,7 @@ class TestTaskInstance(unittest.TestCase):
                         ],
                         'image': ':',
                         'name': 'base',
+                        'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD', 
'value': 'True'}],
                     }
                 ]
             },

Reply via email to