This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 90e984f fixup! fixup! Allow overrides for pod_template_file (#11162)
90e984f is described below
commit 90e984f90b8384cc7a461d8340a48b95af272946
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Sep 28 09:22:27 2020 -0700
fixup! fixup! Allow overrides for pod_template_file (#11162)
---
.../contrib/operators/kubernetes_pod_operator.py | 89 +++++++++++-----------
airflow/kubernetes/pod_generator.py | 48 ------------
kubernetes_tests/test_kubernetes_pod_operator.py | 6 +-
tests/kubernetes/test_pod_generator.py | 16 +---
4 files changed, 52 insertions(+), 107 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index 5053299..7754fd7 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -20,6 +20,7 @@ import re
import yaml
from airflow.exceptions import AirflowException
+from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.pod import Resources
from airflow.models import BaseOperator
@@ -219,8 +220,9 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
self.annotations = annotations or {}
self.affinity = affinity or {}
self.resources = self._set_resources(resources) # noqa
+ self.k8s_resources = self.resources
self.config_file = config_file
- self.image_pull_secrets = image_pull_secrets
+ self.image_pull_secrets = image_pull_secrets or []
self.service_account_name = service_account_name
self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
@@ -365,52 +367,53 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
else:
pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
- pod = k8s.V1Pod(
- api_version="v1",
- kind="Pod",
- metadata=k8s.V1ObjectMeta(
- namespace=self.namespace,
- labels=self.labels,
- name=self.name,
- annotations=self.annotations,
-
- ),
- spec=k8s.V1PodSpec(
- node_selector=self.node_selectors,
- affinity=self.affinity,
- tolerations=self.tolerations,
- init_containers=self.init_containers,
- containers=[
- k8s.V1Container(
- image=self.image,
- name="base",
- command=self.cmds,
- ports=self.ports,
- resources=self.k8s_resources,
- volume_mounts=self.volume_mounts,
- args=self.arguments,
- env=self.env_vars,
- env_from=self.env_from,
- )
- ],
- image_pull_secrets=self.image_pull_secrets,
- service_account_name=self.service_account_name,
- host_network=self.hostnetwork,
- security_context=self.security_context,
- dns_policy=self.dnspolicy,
- scheduler_name=self.schedulername,
- restart_policy='Never',
- priority_class_name=self.priority_class_name,
- volumes=self.volumes,
- )
+ pod = pod_generator.PodGenerator(
+ image=self.image,
+ namespace=self.namespace,
+ cmds=self.cmds,
+ args=self.arguments,
+ labels=self.labels,
+ name=self.name,
+ envs=self.env_vars,
+ extract_xcom=self.do_xcom_push,
+ image_pull_policy=self.image_pull_policy,
+ node_selectors=self.node_selectors,
+ annotations=self.annotations,
+ affinity=self.affinity,
+ image_pull_secrets=self.image_pull_secrets,
+ service_account_name=self.service_account_name,
+ hostnetwork=self.hostnetwork,
+ tolerations=self.tolerations,
+ security_context=self.security_context,
+ dnspolicy=self.dnspolicy,
+ init_containers=self.init_containers,
+ restart_policy='Never',
+ schedulername=self.schedulername,
+ priority_class_name=self.priority_class_name,
+ ).gen_pod()
+
+ # noinspection PyTypeChecker
+ pod = append_to_pod(
+ pod,
+ self.pod_runtime_info_envs + # type: ignore
+ self.ports + # type: ignore
+ self.resources + # type: ignore
+ self.secrets + # type: ignore
+ self.volumes + # type: ignore
+ self.volume_mounts # type: ignore
)
+ env_from = pod.spec.containers[0].env_from or []
+ for configmap in self.configmaps:
+
env_from.append(k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)))
+ pod.spec.containers[0].env_from = env_from
+
+ if self.full_pod_spec:
+ pod_template = PodGenerator.reconcile_pods(pod_template,
self.full_pod_spec)
pod = PodGenerator.reconcile_pods(pod_template, pod)
- for secret in self.secrets:
- pod = secret.attach_to_pod(pod)
- if self.do_xcom_push:
- pod = PodGenerator.add_xcom_sidecar(pod)
+ # if self.do_xcom_push:
+ # pod = PodGenerator.add_sidecar(pod)
return pod
def create_new_pod_for_operator(self, labels, launcher):
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index ed518d1..4fbfec1 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -24,11 +24,6 @@ is supported and no serialization need be written.
import copy
import hashlib
import re
-try:
- from inspect import signature
-except ImportError:
- # Python 2.7
- from funcsigs import signature # type: ignore
import os
import uuid
from functools import reduce
@@ -203,7 +198,6 @@ class PodGenerator(object):
pod_template_file=None,
extract_xcom=False,
):
- self.validate_pod_generator_args(locals())
if pod_template_file:
self.ud_pod = self.deserialize_model_file(pod_template_file)
@@ -556,48 +550,6 @@ class PodGenerator(object):
# pylint: disable=protected-access
return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
- @staticmethod
- def validate_pod_generator_args(given_args):
- """
- :param given_args: The arguments passed to the PodGenerator
constructor.
- :type given_args: dict
- :return: None
-
- Validate that if `pod` or `pod_template_file` are set that the user is
not attempting
- to configure the pod with the other arguments.
- """
- pod_args = list(signature(PodGenerator).parameters.items())
-
- def predicate(k, v):
- """
- :param k: an arg to PodGenerator
- :type k: string
- :param v: the parameter of the given arg
- :type v: inspect.Parameter
- :return: bool
-
- returns True if the PodGenerator argument has no default arguments
- or the default argument is None, and it is not one of the listed
field
- in `non_empty_fields`.
- """
- non_empty_fields = {
- 'pod', 'pod_template_file', 'extract_xcom',
'service_account_name', 'image_pull_policy',
- 'restart_policy'
- }
-
- return (v.default is None or v.default is v.empty) and k not in
non_empty_fields
-
- args_without_defaults = {k: given_args[k] for k, v in pod_args if
predicate(k, v) and given_args[k]}
-
- if given_args['pod'] and given_args['pod_template_file']:
- raise AirflowConfigException("Cannot pass both `pod` and
`pod_template_file` arguments")
- if args_without_defaults and (given_args['pod'] or
given_args['pod_template_file']):
- raise AirflowConfigException(
- "Cannot configure pod and pass either `pod` or
`pod_template_file`. Fields {} passed.".format(
- list(args_without_defaults.keys())
- )
- )
-
def merge_objects(base_obj, client_obj):
"""
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 8983f10..7a8674a 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -841,7 +841,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
k = KubernetesPodOperator(
task_id="task" + self.get_current_task_name(),
labels={"foo": "bar", "fizz": "buzz"},
- env_vars=[k8s.V1EnvVar(name="env_name", value="value")],
+ env_vars={"env_name": "value"},
in_cluster=False,
pod_template_file=fixture,
do_xcom_push=True
@@ -937,9 +937,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
@mock.patch("airflow.kubernetes.kube_client.get_kube_client")
def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
from airflow.utils.state import State
+ fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
k = KubernetesPodOperator(
task_id='task',
- pod_template_file='tests/kubernetes/pod.yaml',
+ pod_template_file=fixture,
do_xcom_push=True
)
monitor_mock.return_value = (State.SUCCESS, None)
@@ -976,6 +977,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'env': [],
'envFrom': [],
'image':
'apache/airflow:stress-2020.07.10-1.0.4',
+ 'imagePullPolicy':
'IfNotPresent',
'name': 'base',
'ports': [],
'resources': {'limits':
{'memory': '200Mi'},
diff --git a/tests/kubernetes/test_pod_generator.py
b/tests/kubernetes/test_pod_generator.py
index 5243673..0c9d722 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -16,12 +16,12 @@
# under the License.
import unittest
+import sys
from tests.compat import mock
import uuid
import kubernetes.client.models as k8s
from kubernetes.client import ApiClient
-from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator,
extend_object_field, merge_objects
@@ -1045,7 +1045,7 @@ class TestPodGenerator(unittest.TestCase):
self.assertEqual(client_spec, res)
def test_deserialize_model_file(self):
- fixture = 'tests/kubernetes/pod.yaml'
+ fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
result = PodGenerator.deserialize_model_file(fixture)
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
self.assertEqual(sanitized_res, self.deserialize_result)
@@ -1073,18 +1073,6 @@ spec:
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
self.assertEqual(sanitized_res, self.deserialize_result)
- def test_validate_pod_generator(self):
- with self.assertRaises(AirflowConfigException):
- PodGenerator(image='k', pod=k8s.V1Pod())
- with self.assertRaises(AirflowConfigException):
- PodGenerator(pod=k8s.V1Pod(), pod_template_file='k')
- with self.assertRaises(AirflowConfigException):
- PodGenerator(image='k', pod_template_file='k')
-
- PodGenerator(image='k')
- PodGenerator(pod_template_file='tests/kubernetes/pod.yaml')
- PodGenerator(pod=k8s.V1Pod())
-
def test_add_custom_label(self):
from kubernetes.client import models as k8s