This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 221f809  Fix full_pod_spec for k8spodoperator (#12354)
221f809 is described below

commit 221f809c1b4e4b78d5a437d012aa7daffd8410a4
Author: Daniel Imberman <[email protected]>
AuthorDate: Sat Nov 14 11:32:34 2020 -0800

    Fix full_pod_spec for k8spodoperator (#12354)
    
    * Fix full_pod_spec for k8spodoperator
    
    Fixes a bug where the `full_pod_spec` argument is never factored
    into the kubernetespodoperator. The new order of operations is as
    follows:
    
    1. Check to see if there is a pod_template_file and if so create the 
initial pod, else start with empty pod
    2. if there is a full_pod_spec , reconcile the pod_template_file pod and 
the full_pod_spec pod
    3.  reconcile with any of the argument overrides
    
    * add tests
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  4 ++
 kubernetes_tests/test_kubernetes_pod_operator.py   | 62 ++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index ef2a34e..d4fe704 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -375,6 +375,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         if self.pod_template_file:
             self.log.debug("Pod template file found, will parse for base pod")
             pod_template = 
pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
+            if self.full_pod_spec:
+                pod_template = PodGenerator.reconcile_pods(pod_template, 
self.full_pod_spec)
+        elif self.full_pod_spec:
+            pod_template = self.full_pod_spec
         else:
             pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
 
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index bfc8dcf..6861737 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -696,6 +696,68 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(k.pod.spec.containers[0].env, 
[k8s.V1EnvVar(name="env_name", value="value")])
         self.assertDictEqual(result, {"hello": "world"})
 
+    def test_pod_template_file_with_full_pod_spec(self):
+        fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml'
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                labels={"foo": "bar", "fizz": "buzz"},
+            ),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        env=[k8s.V1EnvVar(name="env_name", value="value")],
+                    )
+                ]
+            ),
+        )
+        k = KubernetesPodOperator(
+            task_id="task" + self.get_current_task_name(),
+            in_cluster=False,
+            pod_template_file=fixture,
+            full_pod_spec=pod_spec,
+            do_xcom_push=True,
+        )
+
+        context = create_context(k)
+        result = k.execute(context)
+        self.assertIsNotNone(result)
+        self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+        self.assertEqual(k.pod.spec.containers[0].env, 
[k8s.V1EnvVar(name="env_name", value="value")])
+        self.assertDictEqual(result, {"hello": "world"})
+
+    def test_full_pod_spec(self):
+        pod_spec = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                labels={"foo": "bar", "fizz": "buzz"}, namespace="default", 
name="test-pod"
+            ),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        image="perl",
+                        command=["/bin/bash"],
+                        args=["-c", 'echo {\\"hello\\" : \\"world\\"} | cat > 
/airflow/xcom/return.json'],
+                        env=[k8s.V1EnvVar(name="env_name", value="value")],
+                    )
+                ],
+                restart_policy="Never",
+            ),
+        )
+        k = KubernetesPodOperator(
+            task_id="task" + self.get_current_task_name(),
+            in_cluster=False,
+            full_pod_spec=pod_spec,
+            do_xcom_push=True,
+        )
+
+        context = create_context(k)
+        result = k.execute(context)
+        self.assertIsNotNone(result)
+        self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+        self.assertEqual(k.pod.spec.containers[0].env, 
[k8s.V1EnvVar(name="env_name", value="value")])
+        self.assertDictEqual(result, {"hello": "world"})
+
     def test_init_container(self):
         # GIVEN
         volume_mounts = [

Reply via email to