This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 4e362c1 K8s yaml templates not rendered by k8sexecutor (#12303)
4e362c1 is described below
commit 4e362c134702dca30d1bd2e3da38bc2ee39825bf
Author: Daniel Imberman <[email protected]>
AuthorDate: Fri Nov 13 12:06:29 2020 -0800
K8s yaml templates not rendered by k8sexecutor (#12303)
* K8s yaml templates not rendered by k8sexecutor
There is a bug in the yaml template rendering caused by the logic that
yaml templates are only generated when the current executor is the
k8sexecutor. This is a problem as the templates are generated by the
task pod, which is itself running a LocalExecutor. Also generates a
"base" template if this taskInstance has not run yet.
* fix tests
* fix taskinstance test
* fix taskinstance
* fix pod generator tests
* fix podgen
* Update tests/kubernetes/test_pod_generator.py
Co-authored-by: Ash Berlin-Taylor <[email protected]>
* @ashb comment
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/kubernetes/pod_generator.py | 1 +
airflow/models/renderedtifields.py | 5 +++--
airflow/models/taskinstance.py | 2 +-
scripts/ci/libraries/_kind.sh | 2 --
tests/kubernetes/test_pod_generator.py | 9 +++++++++
tests/models/test_renderedtifields.py | 4 +++-
tests/models/test_taskinstance.py | 3 ++-
7 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index 0735948..7f0a52e 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -396,6 +396,7 @@ class PodGenerator:
name="base",
command=command,
image=image,
+ env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD",
value="True")],
)
]
),
diff --git a/airflow/models/renderedtifields.py
b/airflow/models/renderedtifields.py
index 850badf..91ec027 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Save Rendered Template Fields"""
+import os
from typing import Optional
import sqlalchemy_jsonfield
@@ -26,7 +27,7 @@ from airflow.configuration import conf
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.helpers import serialize_template_field
-from airflow.settings import IS_K8S_OR_K8SCELERY_EXECUTOR, json
+from airflow.settings import json
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
@@ -50,7 +51,7 @@ class RenderedTaskInstanceFields(Base):
self.ti = ti
if render_templates:
ti.render_templates()
- if IS_K8S_OR_K8SCELERY_EXECUTOR:
+ if os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", None):
self.k8s_pod_yaml = ti.render_k8s_pod_yaml()
self.rendered_fields = {
field: serialize_template_field(getattr(self.task, field)) for
field in self.task.template_fields
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3ae37c1..c6a1378 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1671,7 +1671,7 @@ class TaskInstance(Base, LoggingMixin): # pylint:
disable=R0902,R0904
rendered_k8s_spec = RenderedTaskInstanceFields.get_k8s_pod_yaml(self)
if not rendered_k8s_spec:
try:
- self.render_k8s_pod_yaml()
+ rendered_k8s_spec = self.render_k8s_pod_yaml()
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(f"Unable to render a k8s spec for this
taskinstance: {e}") from e
return rendered_k8s_spec
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index dcbf6a8..3540064 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -19,10 +19,8 @@
function kind::get_kind_cluster_name() {
# Name of the KinD cluster to connect to
export
KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
- readonly KIND_CLUSTER_NAME
# Name of the KinD cluster to connect to when referred to via kubectl
export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME}
- readonly KUBECTL_CLUSTER_NAME
export KUBECONFIG="${BUILD_CACHE_DIR}/.kube/config"
mkdir -pv "${BUILD_CACHE_DIR}/.kube/"
touch "${KUBECONFIG}"
diff --git a/tests/kubernetes/test_pod_generator.py
b/tests/kubernetes/test_pod_generator.py
index 2aa6363..32e5d06 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -440,6 +440,12 @@ class TestPodGenerator(unittest.TestCase):
expected.spec.containers[0].command = ['command']
expected.spec.containers[0].image = 'airflow_image'
expected.spec.containers[0].resources = {'limits': {'cpu': '1m',
'memory': '1G'}}
+ expected.spec.containers[0].env.append(
+ k8s.V1EnvVar(
+ name="AIRFLOW_IS_K8S_EXECUTOR_POD",
+ value='True',
+ )
+ )
result_dict = self.k8s_client.sanitize_for_serialization(result)
expected_dict =
self.k8s_client.sanitize_for_serialization(self.expected)
@@ -473,6 +479,9 @@ class TestPodGenerator(unittest.TestCase):
worker_config.metadata.labels['app'] = 'myapp'
worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex
worker_config.metadata.namespace = 'namespace'
+ worker_config.spec.containers[0].env.append(
+ k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')
+ )
worker_config_result =
self.k8s_client.sanitize_for_serialization(worker_config)
self.assertEqual(worker_config_result, sanitized_result)
diff --git a/tests/models/test_renderedtifields.py
b/tests/models/test_renderedtifields.py
index bbf1301..297b7c4 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -18,6 +18,7 @@
"""Unit tests for RenderedTaskInstanceFields."""
+import os
import unittest
from datetime import date, timedelta
from unittest import mock
@@ -227,7 +228,7 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
('test_write', 'test', {'bash_command': 'echo test_val_updated',
'env': None}), result_updated
)
-
@mock.patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR",
new=True)
+ @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
@mock.patch("airflow.settings.pod_mutation_hook")
def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook):
"""
@@ -281,6 +282,7 @@ class TestRenderedTaskInstanceFields(unittest.TestCase):
],
'image': ':',
'name': 'base',
+ 'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD',
'value': 'True'}],
}
]
},
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index ac74d89..a3aa445 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1814,7 +1814,7 @@ class TestTaskInstance(unittest.TestCase):
with create_session() as session:
session.query(RenderedTaskInstanceFields).delete()
- @patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR",
new=True)
+ @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
def test_get_rendered_k8s_spec(self):
with DAG('test_get_rendered_k8s_spec', start_date=DEFAULT_DATE):
task = BashOperator(task_id='op1', bash_command="{{ task.task_id
}}")
@@ -1854,6 +1854,7 @@ class TestTaskInstance(unittest.TestCase):
],
'image': ':',
'name': 'base',
+ 'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD',
'value': 'True'}],
}
]
},