This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new d452544  Allow overrides for pod_template_file (#11162)
d452544 is described below

commit d4525440f101be82740110e492581404a8f9ac5c
Author: Daniel Imberman <[email protected]>
AuthorDate: Sun Sep 27 14:39:35 2020 -0700

    Allow overrides for pod_template_file (#11162)
    
    * Allow overrides for pod_template_file
    
    A pod_template_file should be treated as a *template* not a steadfast
    rule.
    
    This PR ensures that users can override individual values set by the
    pod_template_file s.t. the same file can be used for multiple tasks.
    
    * fix podtemplatetest
    
    * fix name
    
    (cherry picked from commit a888198c27bcdbc4538c02360c308ffcaca182fa)
---
 .../contrib/operators/kubernetes_pod_operator.py   |  94 ++++++++++--------
 kubernetes_tests/test_kubernetes_pod_operator.py   | 110 ++++++++++++++-------
 2 files changed, 130 insertions(+), 74 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index cdf5076..d9570d9 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -28,6 +28,7 @@ from airflow.utils.decorators import apply_defaults
 from airflow.utils.helpers import validate_key
 from airflow.utils.state import State
 from airflow.version import version as airflow_version
+from airflow.kubernetes.pod_generator import PodGenerator
 
 
 class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-attributes
@@ -272,6 +273,9 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                 client = 
kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                      
config_file=self.config_file)
 
+            self.pod = self.create_pod_request_obj()
+            self.namespace = self.pod.metadata.namespace
+
             self.client = client
 
             # Add combination of labels to uniquely identify a running pod
@@ -356,45 +360,57 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         Creates a V1Pod based on user parameters. Note that a `pod` or 
`pod_template_file`
         will supersede all other values.
         """
-        pod = pod_generator.PodGenerator(
-            image=self.image,
-            namespace=self.namespace,
-            cmds=self.cmds,
-            args=self.arguments,
-            labels=self.labels,
-            name=self.name,
-            envs=self.env_vars,
-            extract_xcom=self.do_xcom_push,
-            image_pull_policy=self.image_pull_policy,
-            node_selectors=self.node_selectors,
-            annotations=self.annotations,
-            affinity=self.affinity,
-            image_pull_secrets=self.image_pull_secrets,
-            service_account_name=self.service_account_name,
-            hostnetwork=self.hostnetwork,
-            tolerations=self.tolerations,
-            configmaps=self.configmaps,
-            security_context=self.security_context,
-            dnspolicy=self.dnspolicy,
-            init_containers=self.init_containers,
-            restart_policy='Never',
-            schedulername=self.schedulername,
-            pod_template_file=self.pod_template_file,
-            priority_class_name=self.priority_class_name,
-            pod=self.full_pod_spec,
-        ).gen_pod()
-
-        # noinspection PyTypeChecker
-        pod = append_to_pod(
-            pod,
-            self.pod_runtime_info_envs +  # type: ignore
-            self.ports +  # type: ignore
-            self.resources +  # type: ignore
-            self.secrets +  # type: ignore
-            self.volumes +  # type: ignore
-            self.volume_mounts  # type: ignore
+        if self.pod_template_file:
+            pod_template = 
pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
+        else:
+            pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
+
+        pod = k8s.V1Pod(
+            api_version="v1",
+            kind="Pod",
+            metadata=k8s.V1ObjectMeta(
+                namespace=self.namespace,
+                labels=self.labels,
+                name=self.name,
+                annotations=self.annotations,
+
+            ),
+            spec=k8s.V1PodSpec(
+                node_selector=self.node_selectors,
+                affinity=self.affinity,
+                tolerations=self.tolerations,
+                init_containers=self.init_containers,
+                containers=[
+                    k8s.V1Container(
+                        image=self.image,
+                        name="base",
+                        command=self.cmds,
+                        ports=self.ports,
+                        resources=self.k8s_resources,
+                        volume_mounts=self.volume_mounts,
+                        args=self.arguments,
+                        env=self.env_vars,
+                        env_from=self.env_from,
+                    )
+                ],
+                image_pull_secrets=self.image_pull_secrets,
+                service_account_name=self.service_account_name,
+                host_network=self.hostnetwork,
+                security_context=self.security_context,
+                dns_policy=self.dnspolicy,
+                scheduler_name=self.schedulername,
+                restart_policy='Never',
+                priority_class_name=self.priority_class_name,
+                volumes=self.volumes,
+            )
         )
 
+        pod = PodGenerator.reconcile_pods(pod_template, pod)
+
+        for secret in self.secrets:
+            pod = secret.attach_to_pod(pod)
+        if self.do_xcom_push:
+            pod = PodGenerator.add_xcom_sidecar(pod)
         return pod
 
     def create_new_pod_for_operator(self, labels, launcher):
@@ -435,9 +451,9 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
 
     def monitor_launched_pod(self, launcher, pod):
         """
-        Montitors a pod to completion that was created by a previous 
KubernetesPodOperator
+        Monitors a pod to completion that was created by a previous 
KubernetesPodOperator
 
-        @param launcher: pod launcher that will manage launching and 
monitoring pods
+        :param launcher: pod launcher that will manage launching and 
monitoring pods
         :param pod: podspec used to find pod using k8s API
         :return:
         """
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 0335b58..d0e5d5f 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -834,6 +834,24 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertIsNotNone(result)
         self.assertDictEqual(result, {"hello": "world"})
 
+    def test_pod_template_file_with_overrides_system(self):
+        fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml'
+        k = KubernetesPodOperator(
+            task_id="task" + self.get_current_task_name(),
+            labels={"foo": "bar", "fizz": "buzz"},
+            env_vars=[k8s.V1EnvVar(name="env_name", value="value")],
+            in_cluster=False,
+            pod_template_file=fixture,
+            do_xcom_push=True
+        )
+
+        context = create_context(k)
+        result = k.execute(context)
+        self.assertIsNotNone(result)
+        self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+        self.assertEqual(k.pod.spec.containers[0].env, 
[k8s.V1EnvVar(name="env_name", value="value")])
+        self.assertDictEqual(result, {"hello": "world"})
+
     def test_init_container(self):
         # GIVEN
         volume_mounts = [k8s.V1VolumeMount(
@@ -923,42 +941,64 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=True
         )
         monitor_mock.return_value = (State.SUCCESS, None)
-        context = self.create_context(k)
-        k.execute(context)
+        context = create_context(k)
+        with self.assertLogs(k.log, level=logging.DEBUG) as cm:
+            k.execute(context)
+            expected_line = textwrap.dedent("""\
+            DEBUG:airflow.task.operators:Starting pod:
+            api_version: v1
+            kind: Pod
+            metadata:
+              annotations: {}
+              cluster_name: null
+              creation_timestamp: null
+              deletion_grace_period_seconds: null\
+            """).strip()
+            self.assertTrue(any(line.startswith(expected_line) for line in 
cm.output))
+
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {'name': mock.ANY, 'namespace': 'mem-example'},
-            'spec': {
-                'volumes': [{'name': 'xcom', 'emptyDir': {}}],
-                'containers': [{
-                    'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', 
'1'],
-                    'command': ['stress'],
-                    'image': 'apache/airflow:stress-2020.07.10-1.0.4',
-                    'name': 'memory-demo-ctr',
-                    'resources': {
-                        'limits': {'memory': '200Mi'},
-                        'requests': {'memory': '100Mi'}
-                    },
-                    'volumeMounts': [{
-                        'name': 'xcom',
-                        'mountPath': '/airflow/xcom'
-                    }]
-                }, {
-                    'name': 'airflow-xcom-sidecar',
-                    'image': "alpine",
-                    'command': ['sh', '-c', PodDefaults.XCOM_CMD],
-                    'volumeMounts': [
-                        {
-                            'name': 'xcom',
-                            'mountPath': '/airflow/xcom'
-                        }
-                    ],
-                    'resources': {'requests': {'cpu': '1m'}},
-                }],
-            }
-        }, actual_pod)
+        expected_dict = {'apiVersion': 'v1',
+                         'kind': 'Pod',
+                         'metadata': {'annotations': {},
+                                      'labels': {},
+                                      'name': 'memory-demo',
+                                      'namespace': 'mem-example'},
+                         'spec': {'affinity': {},
+                                  'containers': [{'args': ['--vm',
+                                                           '1',
+                                                           '--vm-bytes',
+                                                           '150M',
+                                                           '--vm-hang',
+                                                           '1'],
+                                                  'command': ['stress'],
+                                                  'env': [],
+                                                  'envFrom': [],
+                                                  'image': 
'apache/airflow:stress-2020.07.10-1.0.4',
+                                                  'name': 'base',
+                                                  'ports': [],
+                                                  'resources': {'limits': 
{'memory': '200Mi'},
+                                                                'requests': 
{'memory': '100Mi'}},
+                                                  'volumeMounts': 
[{'mountPath': '/airflow/xcom',
+                                                                    'name': 
'xcom'}]},
+                                                 {'command': ['sh',
+                                                              '-c',
+                                                              'trap "exit 0" 
INT; while true; do sleep '
+                                                              '30; done;'],
+                                                  'image': 'alpine',
+                                                  'name': 
'airflow-xcom-sidecar',
+                                                  'resources': {'requests': 
{'cpu': '1m'}},
+                                                  'volumeMounts': 
[{'mountPath': '/airflow/xcom',
+                                                                    'name': 
'xcom'}]}],
+                                  'hostNetwork': False,
+                                  'imagePullSecrets': [],
+                                  'initContainers': [],
+                                  'nodeSelector': {},
+                                  'restartPolicy': 'Never',
+                                  'securityContext': {},
+                                  'serviceAccountName': 'default',
+                                  'tolerations': [],
+                                  'volumes': [{'emptyDir': {}, 'name': 
'xcom'}]}}
+        self.assertEqual(expected_dict, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")

Reply via email to