This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 227007cbdd72d9497869d51a6d0e20970a01b122 Author: Daniel Imberman <[email protected]> AuthorDate: Sun Sep 27 14:39:35 2020 -0700 Allow overrides for pod_template_file (#11162) * Allow overrides for pod_template_file A pod_template_file should be treated as a *template* not a steadfast rule. This PR ensures that users can override individual values set by the pod_template_file s.t. the same file can be used for multiple tasks. * fix podtemplatetest * fix name (cherry picked from commit a888198c27bcdbc4538c02360c308ffcaca182fa) --- .../contrib/operators/kubernetes_pod_operator.py | 94 ++++++++++------- kubernetes_tests/test_kubernetes_pod_operator.py | 112 ++++++++++++++------- 2 files changed, 132 insertions(+), 74 deletions(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index cdf5076..d9570d9 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -28,6 +28,7 @@ from airflow.utils.decorators import apply_defaults from airflow.utils.helpers import validate_key from airflow.utils.state import State from airflow.version import version as airflow_version +from airflow.kubernetes.pod_generator import PodGenerator class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes @@ -272,6 +273,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- client = kube_client.get_kube_client(cluster_context=self.cluster_context, config_file=self.config_file) + self.pod = self.create_pod_request_obj() + self.namespace = self.pod.metadata.namespace + self.client = client # Add combination of labels to uniquely identify a running pod @@ -356,45 +360,57 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file` will supersede all other values. """ - pod = pod_generator.PodGenerator( - image=self.image, - namespace=self.namespace, - cmds=self.cmds, - args=self.arguments, - labels=self.labels, - name=self.name, - envs=self.env_vars, - extract_xcom=self.do_xcom_push, - image_pull_policy=self.image_pull_policy, - node_selectors=self.node_selectors, - annotations=self.annotations, - affinity=self.affinity, - image_pull_secrets=self.image_pull_secrets, - service_account_name=self.service_account_name, - hostnetwork=self.hostnetwork, - tolerations=self.tolerations, - configmaps=self.configmaps, - security_context=self.security_context, - dnspolicy=self.dnspolicy, - init_containers=self.init_containers, - restart_policy='Never', - schedulername=self.schedulername, - pod_template_file=self.pod_template_file, - priority_class_name=self.priority_class_name, - pod=self.full_pod_spec, - ).gen_pod() - - # noinspection PyTypeChecker - pod = append_to_pod( - pod, - self.pod_runtime_info_envs + # type: ignore - self.ports + # type: ignore - self.resources + # type: ignore - self.secrets + # type: ignore - self.volumes + # type: ignore - self.volume_mounts # type: ignore + if self.pod_template_file: + pod_template = pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file) + else: + pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name")) + + pod = k8s.V1Pod( + api_version="v1", + kind="Pod", + metadata=k8s.V1ObjectMeta( + namespace=self.namespace, + labels=self.labels, + name=self.name, + annotations=self.annotations, + + ), + spec=k8s.V1PodSpec( + node_selector=self.node_selectors, + affinity=self.affinity, + tolerations=self.tolerations, + init_containers=self.init_containers, + containers=[ + k8s.V1Container( + image=self.image, + name="base", + command=self.cmds, + ports=self.ports, + resources=self.k8s_resources, + volume_mounts=self.volume_mounts, + args=self.arguments, + env=self.env_vars, + env_from=self.env_from, + ) + ], + image_pull_secrets=self.image_pull_secrets, + service_account_name=self.service_account_name, + host_network=self.hostnetwork, + security_context=self.security_context, + dns_policy=self.dnspolicy, + scheduler_name=self.schedulername, + restart_policy='Never', + priority_class_name=self.priority_class_name, + volumes=self.volumes, + ) ) + pod = PodGenerator.reconcile_pods(pod_template, pod) + + for secret in self.secrets: + pod = secret.attach_to_pod(pod) + if self.do_xcom_push: + pod = PodGenerator.add_xcom_sidecar(pod) return pod def create_new_pod_for_operator(self, labels, launcher): @@ -435,9 +451,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- def monitor_launched_pod(self, launcher, pod): """ - Montitors a pod to completion that was created by a previous KubernetesPodOperator + Monitors a pod to completion that was created by a previous KubernetesPodOperator - @param launcher: pod launcher that will manage launching and monitoring pods + :param launcher: pod launcher that will manage launching and monitoring pods :param pod: podspec used to find pod using k8s API :return: """ diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 0335b58..8983f10 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -17,10 +17,12 @@ # under the License. import json +import logging import os import shutil import sys import unittest +import textwrap import kubernetes.client.models as k8s import pendulum @@ -834,6 +836,24 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.assertIsNotNone(result) self.assertDictEqual(result, {"hello": "world"}) + def test_pod_template_file_with_overrides_system(self): + fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml' + k = KubernetesPodOperator( + task_id="task" + self.get_current_task_name(), + labels={"foo": "bar", "fizz": "buzz"}, + env_vars=[k8s.V1EnvVar(name="env_name", value="value")], + in_cluster=False, + pod_template_file=fixture, + do_xcom_push=True + ) + + context = create_context(k) + result = k.execute(context) + self.assertIsNotNone(result) + self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'}) + self.assertEqual(k.pod.spec.containers[0].env, [k8s.V1EnvVar(name="env_name", value="value")]) + self.assertDictEqual(result, {"hello": "world"}) + def test_init_container(self): # GIVEN volume_mounts = [k8s.V1VolumeMount( @@ -923,42 +943,64 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=True ) monitor_mock.return_value = (State.SUCCESS, None) - context = self.create_context(k) - k.execute(context) + context = create_context(k) + with self.assertLogs(k.log, level=logging.DEBUG) as cm: + k.execute(context) + expected_line = textwrap.dedent("""\ + DEBUG:airflow.task.operators:Starting pod: + api_version: v1 + kind: Pod + metadata: + annotations: {} + cluster_name: null + creation_timestamp: null + deletion_grace_period_seconds: null\ + """).strip() + self.assertTrue(any(line.startswith(expected_line) for line in cm.output)) + actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'name': mock.ANY, 'namespace': 'mem-example'}, - 'spec': { - 'volumes': [{'name': 'xcom', 'emptyDir': {}}], - 'containers': [{ - 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'], - 'command': ['stress'], - 'image': 'apache/airflow:stress-2020.07.10-1.0.4', - 'name': 'memory-demo-ctr', - 'resources': { - 'limits': {'memory': '200Mi'}, - 'requests': {'memory': '100Mi'} - }, - 'volumeMounts': [{ - 'name': 'xcom', - 'mountPath': '/airflow/xcom' - }] - }, { - 'name': 'airflow-xcom-sidecar', - 'image': "alpine", - 'command': ['sh', '-c', PodDefaults.XCOM_CMD], - 'volumeMounts': [ - { - 'name': 'xcom', - 'mountPath': '/airflow/xcom' - } - ], - 'resources': {'requests': {'cpu': '1m'}}, - }], - } - }, actual_pod) + expected_dict = {'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'annotations': {}, + 'labels': {}, + 'name': 'memory-demo', + 'namespace': 'mem-example'}, + 'spec': {'affinity': {}, + 'containers': [{'args': ['--vm', + '1', + '--vm-bytes', + '150M', + '--vm-hang', + '1'], + 'command': ['stress'], + 'env': [], + 'envFrom': [], + 'image': 'apache/airflow:stress-2020.07.10-1.0.4', + 'name': 'base', + 'ports': [], + 'resources': {'limits': {'memory': '200Mi'}, + 'requests': {'memory': '100Mi'}}, + 'volumeMounts': [{'mountPath': '/airflow/xcom', + 'name': 'xcom'}]}, + {'command': ['sh', + '-c', + 'trap "exit 0" INT; while true; do sleep ' + '30; done;'], + 'image': 'alpine', + 'name': 'airflow-xcom-sidecar', + 'resources': {'requests': {'cpu': '1m'}}, + 'volumeMounts': [{'mountPath': '/airflow/xcom', + 'name': 'xcom'}]}], + 'hostNetwork': False, + 'imagePullSecrets': [], + 'initContainers': [], + 'nodeSelector': {}, + 'restartPolicy': 'Never', + 'securityContext': {}, + 'serviceAccountName': 'default', + 'tolerations': [], + 'volumes': [{'emptyDir': {}, 'name': 'xcom'}]}} + self.assertEqual(expected_dict, actual_pod) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
