This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 221f809 Fix full_pod_spec for k8spodoperator (#12354)
221f809 is described below
commit 221f809c1b4e4b78d5a437d012aa7daffd8410a4
Author: Daniel Imberman <[email protected]>
AuthorDate: Sat Nov 14 11:32:34 2020 -0800
Fix full_pod_spec for k8spodoperator (#12354)
* Fix full_pod_spec for k8spodoperator
Fixes a bug where the `full_pod_spec` argument is never factored
into the kubernetespodoperator. The new order of operations is as
follows:
1. Check to see if there is a pod_template_file and if so create the
initial pod, else start with empty pod
2. if there is a full_pod_spec , reconcile the pod_template_file pod and
the full_pod_spec pod
3. reconcile with any of the argument overrides
* add tests
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 4 ++
kubernetes_tests/test_kubernetes_pod_operator.py | 62 ++++++++++++++++++++++
2 files changed, 66 insertions(+)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index ef2a34e..d4fe704 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -375,6 +375,10 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
if self.pod_template_file:
self.log.debug("Pod template file found, will parse for base pod")
pod_template =
pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
+ if self.full_pod_spec:
+ pod_template = PodGenerator.reconcile_pods(pod_template,
self.full_pod_spec)
+ elif self.full_pod_spec:
+ pod_template = self.full_pod_spec
else:
pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index bfc8dcf..6861737 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -696,6 +696,68 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.assertEqual(k.pod.spec.containers[0].env,
[k8s.V1EnvVar(name="env_name", value="value")])
self.assertDictEqual(result, {"hello": "world"})
+ def test_pod_template_file_with_full_pod_spec(self):
+ fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml'
+ pod_spec = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ labels={"foo": "bar", "fizz": "buzz"},
+ ),
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ env=[k8s.V1EnvVar(name="env_name", value="value")],
+ )
+ ]
+ ),
+ )
+ k = KubernetesPodOperator(
+ task_id="task" + self.get_current_task_name(),
+ in_cluster=False,
+ pod_template_file=fixture,
+ full_pod_spec=pod_spec,
+ do_xcom_push=True,
+ )
+
+ context = create_context(k)
+ result = k.execute(context)
+ self.assertIsNotNone(result)
+ self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+ self.assertEqual(k.pod.spec.containers[0].env,
[k8s.V1EnvVar(name="env_name", value="value")])
+ self.assertDictEqual(result, {"hello": "world"})
+
+ def test_full_pod_spec(self):
+ pod_spec = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ labels={"foo": "bar", "fizz": "buzz"}, namespace="default",
name="test-pod"
+ ),
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ image="perl",
+ command=["/bin/bash"],
+ args=["-c", 'echo {\\"hello\\" : \\"world\\"} | cat >
/airflow/xcom/return.json'],
+ env=[k8s.V1EnvVar(name="env_name", value="value")],
+ )
+ ],
+ restart_policy="Never",
+ ),
+ )
+ k = KubernetesPodOperator(
+ task_id="task" + self.get_current_task_name(),
+ in_cluster=False,
+ full_pod_spec=pod_spec,
+ do_xcom_push=True,
+ )
+
+ context = create_context(k)
+ result = k.execute(context)
+ self.assertIsNotNone(result)
+ self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+ self.assertEqual(k.pod.spec.containers[0].env,
[k8s.V1EnvVar(name="env_name", value="value")])
+ self.assertDictEqual(result, {"hello": "world"})
+
def test_init_container(self):
# GIVEN
volume_mounts = [