This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 90e984f  fixup! fixup! Allow overrides for pod_template_file (#11162)
90e984f is described below

commit 90e984f90b8384cc7a461d8340a48b95af272946
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Sep 28 09:22:27 2020 -0700

    fixup! fixup! Allow overrides for pod_template_file (#11162)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 89 +++++++++++-----------
 airflow/kubernetes/pod_generator.py                | 48 ------------
 kubernetes_tests/test_kubernetes_pod_operator.py   |  6 +-
 tests/kubernetes/test_pod_generator.py             | 16 +---
 4 files changed, 52 insertions(+), 107 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index 5053299..7754fd7 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -20,6 +20,7 @@ import re
 import yaml
 
 from airflow.exceptions import AirflowException
+from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes import kube_client, pod_generator, pod_launcher
 from airflow.kubernetes.pod import Resources
 from airflow.models import BaseOperator
@@ -219,8 +220,9 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         self.annotations = annotations or {}
         self.affinity = affinity or {}
         self.resources = self._set_resources(resources)  # noqa
+        self.k8s_resources = self.resources
         self.config_file = config_file
-        self.image_pull_secrets = image_pull_secrets
+        self.image_pull_secrets = image_pull_secrets or []
         self.service_account_name = service_account_name
         self.is_delete_operator_pod = is_delete_operator_pod
         self.hostnetwork = hostnetwork
@@ -365,52 +367,53 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         else:
             pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
 
-        pod = k8s.V1Pod(
-            api_version="v1",
-            kind="Pod",
-            metadata=k8s.V1ObjectMeta(
-                namespace=self.namespace,
-                labels=self.labels,
-                name=self.name,
-                annotations=self.annotations,
-
-            ),
-            spec=k8s.V1PodSpec(
-                node_selector=self.node_selectors,
-                affinity=self.affinity,
-                tolerations=self.tolerations,
-                init_containers=self.init_containers,
-                containers=[
-                    k8s.V1Container(
-                        image=self.image,
-                        name="base",
-                        command=self.cmds,
-                        ports=self.ports,
-                        resources=self.k8s_resources,
-                        volume_mounts=self.volume_mounts,
-                        args=self.arguments,
-                        env=self.env_vars,
-                        env_from=self.env_from,
-                    )
-                ],
-                image_pull_secrets=self.image_pull_secrets,
-                service_account_name=self.service_account_name,
-                host_network=self.hostnetwork,
-                security_context=self.security_context,
-                dns_policy=self.dnspolicy,
-                scheduler_name=self.schedulername,
-                restart_policy='Never',
-                priority_class_name=self.priority_class_name,
-                volumes=self.volumes,
-            )
+        pod = pod_generator.PodGenerator(
+            image=self.image,
+            namespace=self.namespace,
+            cmds=self.cmds,
+            args=self.arguments,
+            labels=self.labels,
+            name=self.name,
+            envs=self.env_vars,
+            extract_xcom=self.do_xcom_push,
+            image_pull_policy=self.image_pull_policy,
+            node_selectors=self.node_selectors,
+            annotations=self.annotations,
+            affinity=self.affinity,
+            image_pull_secrets=self.image_pull_secrets,
+            service_account_name=self.service_account_name,
+            hostnetwork=self.hostnetwork,
+            tolerations=self.tolerations,
+            security_context=self.security_context,
+            dnspolicy=self.dnspolicy,
+            init_containers=self.init_containers,
+            restart_policy='Never',
+            schedulername=self.schedulername,
+            priority_class_name=self.priority_class_name,
+        ).gen_pod()
+
+        # noinspection PyTypeChecker
+        pod = append_to_pod(
+            pod,
+            self.pod_runtime_info_envs +  # type: ignore
+            self.ports +  # type: ignore
+            self.resources +  # type: ignore
+            self.secrets +  # type: ignore
+            self.volumes +  # type: ignore
+            self.volume_mounts  # type: ignore
         )
 
+        env_from = pod.spec.containers[0].env_from or []
+        for configmap in self.configmaps:
+            
env_from.append(k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)))
+        pod.spec.containers[0].env_from = env_from
+
+        if self.full_pod_spec:
+            pod_template = PodGenerator.reconcile_pods(pod_template, 
self.full_pod_spec)
         pod = PodGenerator.reconcile_pods(pod_template, pod)
 
-        for secret in self.secrets:
-            pod = secret.attach_to_pod(pod)
-        if self.do_xcom_push:
-            pod = PodGenerator.add_xcom_sidecar(pod)
+        # if self.do_xcom_push:
+        #     pod = PodGenerator.add_sidecar(pod)
         return pod
 
     def create_new_pod_for_operator(self, labels, launcher):
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index ed518d1..4fbfec1 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -24,11 +24,6 @@ is supported and no serialization need be written.
 import copy
 import hashlib
 import re
-try:
-    from inspect import signature
-except ImportError:
-    # Python 2.7
-    from funcsigs import signature  # type: ignore
 import os
 import uuid
 from functools import reduce
@@ -203,7 +198,6 @@ class PodGenerator(object):
         pod_template_file=None,
         extract_xcom=False,
     ):
-        self.validate_pod_generator_args(locals())
 
         if pod_template_file:
             self.ud_pod = self.deserialize_model_file(pod_template_file)
@@ -556,48 +550,6 @@ class PodGenerator(object):
         # pylint: disable=protected-access
         return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
 
-    @staticmethod
-    def validate_pod_generator_args(given_args):
-        """
-        :param given_args: The arguments passed to the PodGenerator 
constructor.
-        :type given_args: dict
-        :return: None
-
-        Validate that if `pod` or `pod_template_file` are set that the user is 
not attempting
-        to configure the pod with the other arguments.
-        """
-        pod_args = list(signature(PodGenerator).parameters.items())
-
-        def predicate(k, v):
-            """
-            :param k: an arg to PodGenerator
-            :type k: string
-            :param v: the parameter of the given arg
-            :type v: inspect.Parameter
-            :return: bool
-
-            returns True if the PodGenerator argument has no default arguments
-            or the default argument is None, and it is not one of the listed 
field
-            in `non_empty_fields`.
-            """
-            non_empty_fields = {
-                'pod', 'pod_template_file', 'extract_xcom', 
'service_account_name', 'image_pull_policy',
-                'restart_policy'
-            }
-
-            return (v.default is None or v.default is v.empty) and k not in 
non_empty_fields
-
-        args_without_defaults = {k: given_args[k] for k, v in pod_args if 
predicate(k, v) and given_args[k]}
-
-        if given_args['pod'] and given_args['pod_template_file']:
-            raise AirflowConfigException("Cannot pass both `pod` and 
`pod_template_file` arguments")
-        if args_without_defaults and (given_args['pod'] or 
given_args['pod_template_file']):
-            raise AirflowConfigException(
-                "Cannot configure pod and pass either `pod` or 
`pod_template_file`. Fields {} passed.".format(
-                    list(args_without_defaults.keys())
-                )
-            )
-
 
 def merge_objects(base_obj, client_obj):
     """
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 8983f10..7a8674a 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -841,7 +841,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         k = KubernetesPodOperator(
             task_id="task" + self.get_current_task_name(),
             labels={"foo": "bar", "fizz": "buzz"},
-            env_vars=[k8s.V1EnvVar(name="env_name", value="value")],
+            env_vars={"env_name": "value"},
             in_cluster=False,
             pod_template_file=fixture,
             do_xcom_push=True
@@ -937,9 +937,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
         from airflow.utils.state import State
+        fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
         k = KubernetesPodOperator(
             task_id='task',
-            pod_template_file='tests/kubernetes/pod.yaml',
+            pod_template_file=fixture,
             do_xcom_push=True
         )
         monitor_mock.return_value = (State.SUCCESS, None)
@@ -976,6 +977,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                                                   'env': [],
                                                   'envFrom': [],
                                                   'image': 
'apache/airflow:stress-2020.07.10-1.0.4',
+                                                  'imagePullPolicy': 
'IfNotPresent',
                                                   'name': 'base',
                                                   'ports': [],
                                                   'resources': {'limits': 
{'memory': '200Mi'},
diff --git a/tests/kubernetes/test_pod_generator.py 
b/tests/kubernetes/test_pod_generator.py
index 5243673..0c9d722 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -16,12 +16,12 @@
 # under the License.
 
 import unittest
+import sys
 from tests.compat import mock
 import uuid
 import kubernetes.client.models as k8s
 from kubernetes.client import ApiClient
 
-from airflow.exceptions import AirflowConfigException
 from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes.pod import Resources
 from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, 
extend_object_field, merge_objects
@@ -1045,7 +1045,7 @@ class TestPodGenerator(unittest.TestCase):
         self.assertEqual(client_spec, res)
 
     def test_deserialize_model_file(self):
-        fixture = 'tests/kubernetes/pod.yaml'
+        fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
         result = PodGenerator.deserialize_model_file(fixture)
         sanitized_res = self.k8s_client.sanitize_for_serialization(result)
         self.assertEqual(sanitized_res, self.deserialize_result)
@@ -1073,18 +1073,6 @@ spec:
         sanitized_res = self.k8s_client.sanitize_for_serialization(result)
         self.assertEqual(sanitized_res, self.deserialize_result)
 
-    def test_validate_pod_generator(self):
-        with self.assertRaises(AirflowConfigException):
-            PodGenerator(image='k', pod=k8s.V1Pod())
-        with self.assertRaises(AirflowConfigException):
-            PodGenerator(pod=k8s.V1Pod(), pod_template_file='k')
-        with self.assertRaises(AirflowConfigException):
-            PodGenerator(image='k', pod_template_file='k')
-
-        PodGenerator(image='k')
-        PodGenerator(pod_template_file='tests/kubernetes/pod.yaml')
-        PodGenerator(pod=k8s.V1Pod())
-
     def test_add_custom_label(self):
         from kubernetes.client import models as k8s
 

Reply via email to