This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch fix-env-from-1-10 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3a5663e43931903cee02abbea5193a6a8cb9e9e1 Author: Daniel Imberman <[email protected]> AuthorDate: Mon Dec 14 21:22:17 2020 -0800 KubernetesExecutor overrides should only add to lists This PR makes 1.10 interaction more similar to that of Airflow 2.0. Essentially users are able to override values that are in maps, but when it comes to lists in k8s objects, it is too complicated to consistently override. --- airflow/kubernetes/pod_generator.py | 225 +++-- tests/kubernetes/test_pod_generator.py | 1453 ++++++++++++++++---------------- 2 files changed, 848 insertions(+), 830 deletions(-) diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 4df3198..8f0d141 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -34,7 +34,6 @@ from dateutil import parser from kubernetes.client.api_client import ApiClient from airflow.contrib.kubernetes.pod import _extract_volume_mounts -from airflow.exceptions import AirflowConfigException from airflow.version import version as airflow_version MAX_LABEL_LEN = 63 @@ -50,21 +49,15 @@ class PodDefaults(object): def __init__(self): pass - XCOM_MOUNT_PATH = '/airflow/xcom' - SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar' + XCOM_MOUNT_PATH = "/airflow/xcom" + SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;' - VOLUME_MOUNT = k8s.V1VolumeMount( - name='xcom', - mount_path=XCOM_MOUNT_PATH - ) - VOLUME = k8s.V1Volume( - name='xcom', - empty_dir=k8s.V1EmptyDirVolumeSource() - ) + VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) + VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, - command=['sh', '-c', XCOM_CMD], - image='alpine', + command=["sh", "-c", XCOM_CMD], + image="alpine", volume_mounts=[VOLUME_MOUNT], resources=k8s.V1ResourceRequirements( requests={ @@ -88,7 +81,7 @@ def make_safe_label_value(string): if len(safe_label) > MAX_LABEL_LEN or string != safe_label: safe_hash = hashlib.md5(string.encode()).hexdigest()[:9] - safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash + safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash return safe_label @@ -102,7 +95,7 @@ def datetime_to_label_safe_datestring(datetime_obj): :param datetime_obj: datetime.datetime object :return: ISO-like string representing the datetime """ - return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_') + return datetime_obj.isoformat().replace(":", "_").replace("+", "_plus_") def label_safe_datestring_to_datetime(string): @@ -114,7 +107,7 @@ def label_safe_datestring_to_datetime(string): :param string: str :return: datetime.datetime object """ - return parser.parse(string.replace('_plus_', '+').replace("_", ":")) + return parser.parse(string.replace("_plus_", "+").replace("_", ":")) class PodGenerator(object): @@ -230,8 +223,8 @@ class PodGenerator(object): self.ud_pod = pod self.pod = k8s.V1Pod() - self.pod.api_version = 'v1' - self.pod.kind = 'Pod' + self.pod.api_version = "v1" + self.pod.kind = "Pod" # Pod Metadata self.metadata = k8s.V1ObjectMeta() @@ -241,35 +234,34 @@ class PodGenerator(object): self.metadata.annotations = annotations # Pod Container - self.container = k8s.V1Container(name='base') + self.container = k8s.V1Container(name="base") self.container.image = image self.container.env = [] if envs: if isinstance(envs, dict): for key, val in envs.items(): - self.container.env.append(k8s.V1EnvVar( - name=key, - value=val - )) + self.container.env.append(k8s.V1EnvVar(name=key, value=val)) elif isinstance(envs, list): self.container.env.extend(envs) configmaps = configmaps or [] self.container.env_from = [] for configmap in configmaps: - self.container.env_from.append(k8s.V1EnvFromSource( - config_map_ref=k8s.V1ConfigMapEnvSource( - name=configmap + self.container.env_from.append( + k8s.V1EnvFromSource( + config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap) ) - )) + ) self.container.command = cmds or [] self.container.args = args or [] self.container.image_pull_policy = image_pull_policy self.container.ports = ports or [] self.container.resources = resources - self.container.volume_mounts = [v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)] + self.container.volume_mounts = [ + v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts) + ] # Pod Spec self.spec = k8s.V1PodSpec(containers=[]) @@ -288,10 +280,10 @@ class PodGenerator(object): self.spec.image_pull_secrets = [] if image_pull_secrets: - for image_pull_secret in image_pull_secrets.split(','): - self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference( - name=image_pull_secret - )) + for image_pull_secret in image_pull_secrets.split(","): + self.spec.image_pull_secrets.append( + k8s.V1LocalObjectReference(name=image_pull_secret) + ) # Attach sidecar self.extract_xcom = extract_xcom @@ -325,7 +317,7 @@ class PodGenerator(object): return None safe_uuid = uuid.uuid4().hex - safe_pod_id = dag_id[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + safe_pod_id = dag_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1] safe_pod_id = safe_pod_id + "-" + safe_uuid return safe_pod_id @@ -335,7 +327,9 @@ class PodGenerator(object): pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) - pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] + pod_cp.spec.containers[0].volume_mounts = ( + pod_cp.spec.containers[0].volume_mounts or [] + ) pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER) @@ -351,8 +345,9 @@ class PodGenerator(object): if not isinstance(obj, dict): raise TypeError( - 'Cannot convert a non-dictionary or non-PodGenerator ' - 'object into a KubernetesExecutorConfig') + "Cannot convert a non-dictionary or non-PodGenerator " + "object into a KubernetesExecutorConfig" + ) # We do not want to extract constant here from ExecutorLoader because it is just # A name in dictionary rather than executor selection mechanism and it causes cyclic import @@ -361,50 +356,55 @@ class PodGenerator(object): if not namespaced: return None - resources = namespaced.get('resources') + resources = namespaced.get("resources") if resources is None: + def extract(cpu, memory, ephemeral_storage, limit_gpu=None): resources_obj = { - 'cpu': namespaced.pop(cpu, None), - 'memory': namespaced.pop(memory, None), - 'ephemeral-storage': namespaced.pop(ephemeral_storage, None), + "cpu": namespaced.pop(cpu, None), + "memory": namespaced.pop(memory, None), + "ephemeral-storage": namespaced.pop(ephemeral_storage, None), } if limit_gpu is not None: - resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None) + resources_obj["nvidia.com/gpu"] = namespaced.pop(limit_gpu, None) - resources_obj = {k: v for k, v in resources_obj.items() if v is not None} + resources_obj = { + k: v for k, v in resources_obj.items() if v is not None + } if all(r is None for r in resources_obj): resources_obj = None return namespaced, resources_obj - namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage') - namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage', - limit_gpu='limit_gpu') + namespaced, requests = extract( + "request_cpu", "request_memory", "request_ephemeral_storage" + ) + namespaced, limits = extract( + "limit_cpu", + "limit_memory", + "limit_ephemeral_storage", + limit_gpu="limit_gpu", + ) if requests is None and limits is None: resources = None else: - resources = k8s.V1ResourceRequirements( - requests=requests, - limits=limits - ) + resources = k8s.V1ResourceRequirements(requests=requests, limits=limits) elif isinstance(resources, dict): resources = k8s.V1ResourceRequirements( - requests=resources['requests'], - limits=resources['limits'] + requests=resources["requests"], limits=resources["limits"] ) - annotations = namespaced.get('annotations', {}) - gcp_service_account_key = namespaced.get('gcp_service_account_key', None) + annotations = namespaced.get("annotations", {}) + gcp_service_account_key = namespaced.get("gcp_service_account_key", None) if annotations is not None and gcp_service_account_key is not None: - annotations.update({ - 'iam.cloud.google.com/service-account': gcp_service_account_key - }) + annotations.update( + {"iam.cloud.google.com/service-account": gcp_service_account_key} + ) - namespaced['resources'] = resources + namespaced["resources"] = resources return PodGenerator(**namespaced).gen_pod() @staticmethod @@ -426,8 +426,12 @@ class PodGenerator(object): return base_pod client_pod_cp = copy.deepcopy(client_pod) - client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec) - client_pod_cp.metadata = PodGenerator.reconcile_metadata(base_pod.metadata, client_pod_cp.metadata) + client_pod_cp.spec = PodGenerator.reconcile_specs( + base_pod.spec, client_pod_cp.spec + ) + client_pod_cp.metadata = PodGenerator.reconcile_metadata( + base_pod.metadata, client_pod_cp.metadata + ) client_pod_cp = merge_objects(base_pod, client_pod_cp) return client_pod_cp @@ -448,17 +452,18 @@ class PodGenerator(object): return client_meta elif client_meta and base_meta: client_meta.labels = merge_objects(base_meta.labels, client_meta.labels) - client_meta.annotations = merge_objects(base_meta.annotations, client_meta.annotations) - extend_object_field(base_meta, client_meta, 'managed_fields') - extend_object_field(base_meta, client_meta, 'finalizers') - extend_object_field(base_meta, client_meta, 'owner_references') + client_meta.annotations = merge_objects( + base_meta.annotations, client_meta.annotations + ) + extend_object_field(base_meta, client_meta, "managed_fields") + extend_object_field(base_meta, client_meta, "finalizers") + extend_object_field(base_meta, client_meta, "owner_references") return merge_objects(base_meta, client_meta) return None @staticmethod - def reconcile_specs(base_spec, - client_spec): + def reconcile_specs(base_spec, client_spec): """ :param base_spec: has the base attributes which are overwritten if they exist in the client_spec and remain if they do not exist in the client_spec @@ -475,14 +480,13 @@ class PodGenerator(object): client_spec.containers = PodGenerator.reconcile_containers( base_spec.containers, client_spec.containers ) - merged_spec = extend_object_field(base_spec, client_spec, 'volumes') + merged_spec = extend_object_field(base_spec, client_spec, "volumes") return merge_objects(base_spec, merged_spec) return None @staticmethod - def reconcile_containers(base_containers, - client_containers): + def reconcile_containers(base_containers, client_containers): """ :param base_containers: has the base attributes which are overwritten if they exist in the client_containers and remain if they do not exist in the client_containers @@ -501,14 +505,18 @@ class PodGenerator(object): client_container = client_containers[0] base_container = base_containers[0] client_container = extend_object_field( - base_container, - client_container, - 'volume_mounts', - 'mount_path') - client_container = extend_object_field(base_container, client_container, 'env') - client_container = extend_object_field(base_container, client_container, 'env_from') - client_container = extend_object_field(base_container, client_container, 'ports') - client_container = extend_object_field(base_container, client_container, 'volume_devices') + base_container, client_container, "volume_mounts" + ) + client_container = extend_object_field(base_container, client_container, "env") + client_container = extend_object_field( + base_container, client_container, "env_from" + ) + client_container = extend_object_field( + base_container, client_container, "ports" + ) + client_container = extend_object_field( + base_container, client_container, "volume_devices" + ) client_container = merge_objects(base_container, client_container) return [client_container] + PodGenerator.reconcile_containers( @@ -527,7 +535,7 @@ class PodGenerator(object): pod_override_object, base_worker_pod, namespace, - worker_uuid + worker_uuid, ): """ Construct a pod by gathering and consolidating the configuration from 3 places: @@ -545,22 +553,22 @@ class PodGenerator(object): namespace=namespace, image=image, labels={ - 'airflow-worker': worker_uuid, - 'dag_id': make_safe_label_value(dag_id), - 'task_id': make_safe_label_value(task_id), - 'execution_date': datetime_to_label_safe_datestring(date), - 'try_number': str(try_number), - 'airflow_version': airflow_version.replace('+', '-'), - 'kubernetes_executor': 'True', + "airflow-worker": worker_uuid, + "dag_id": make_safe_label_value(dag_id), + "task_id": make_safe_label_value(task_id), + "execution_date": datetime_to_label_safe_datestring(date), + "try_number": str(try_number), + "airflow_version": airflow_version.replace("+", "-"), + "kubernetes_executor": "True", }, annotations={ - 'dag_id': dag_id, - 'task_id': task_id, - 'execution_date': date.isoformat(), - 'try_number': str(try_number), + "dag_id": dag_id, + "task_id": task_id, + "execution_date": date.isoformat(), + "try_number": str(try_number), }, cmds=command, - name=pod_id + name=pod_id, ).gen_pod() # Reconcile the pods starting with the first chronologically, @@ -627,7 +635,7 @@ def merge_objects(base_obj, client_obj): return client_obj_cp -def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"): +def extend_object_field(base_obj, client_obj, field_name): """ :param base_obj: an object which has a property `field_name` that is a list :param client_obj: an object which has a property `field_name` that is a list. @@ -640,8 +648,9 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name") base_obj_field = getattr(base_obj, field_name, None) client_obj_field = getattr(client_obj, field_name, None) - if (not isinstance(base_obj_field, list) and base_obj_field is not None) or \ - (not isinstance(client_obj_field, list) and client_obj_field is not None): + if (not isinstance(base_obj_field, list) and base_obj_field is not None) or ( + not isinstance(client_obj_field, list) and client_obj_field is not None + ): raise ValueError("The chosen field must be a list.") if not base_obj_field: @@ -650,36 +659,6 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name") setattr(client_obj_cp, field_name, base_obj_field) return client_obj_cp - base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge) - client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge) - - appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set) - + appended_fields = base_obj_field + client_obj_field setattr(client_obj_cp, field_name, appended_fields) return client_obj_cp - - -def _merge_list_of_objects(base_obj_set, client_obj_set): - for k, v in base_obj_set.items(): - if k not in client_obj_set: - client_obj_set[k] = v - else: - client_obj_set[k] = merge_objects(v, client_obj_set[k]) - appended_field_keys = sorted(client_obj_set.keys()) - appended_fields = [client_obj_set[k] for k in appended_field_keys] - return appended_fields - - -def _get_dict_from_list(base_list, field_to_merge="name"): - """ - :type base_list: list(Optional[dict, *to_dict]) - """ - result = {} - for obj in base_list: - if isinstance(obj, dict): - result[obj[field_to_merge]] = obj - elif hasattr(obj, "to_dict"): - result[getattr(obj, field_to_merge)] = obj - else: - raise AirflowConfigException("Trying to merge invalid object {}".format(obj)) - return result diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index fed7c97..df7afdf 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -25,532 +25,574 @@ from kubernetes.client import ApiClient, models as k8s from airflow.kubernetes.k8s_model import append_to_pod from airflow.kubernetes.pod import Resources -from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \ - datetime_to_label_safe_datestring +from airflow.kubernetes.pod_generator import ( + PodDefaults, + PodGenerator, + extend_object_field, + merge_objects, + datetime_to_label_safe_datestring, +) from airflow.kubernetes.secret import Secret class TestPodGenerator(unittest.TestCase): - def setUp(self): - self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') + self.static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48") self.deserialize_result = { - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'name': 'memory-demo', 'namespace': 'mem-example'}, - 'spec': { - 'containers': [{ - 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'], - 'command': ['stress'], - 'image': 'apache/airflow:stress-2020.07.10-1.0.4', - 'name': 'memory-demo-ctr', - 'resources': { - 'limits': {'memory': '200Mi'}, - 'requests': {'memory': '100Mi'} + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": "memory-demo", "namespace": "mem-example"}, + "spec": { + "containers": [ + { + "args": ["--vm", "1", "--vm-bytes", "150M", "--vm-hang", "1"], + "command": ["stress"], + "image": "apache/airflow:stress-2020.07.10-1.0.4", + "name": "memory-demo-ctr", + "resources": { + "limits": {"memory": "200Mi"}, + "requests": {"memory": "100Mi"}, + }, } - }] - } + ] + }, } - self.envs = { - 'ENVIRONMENT': 'prod', - 'LOG_LEVEL': 'warning' - } + self.envs = {"ENVIRONMENT": "prod", "LOG_LEVEL": "warning"} self.secrets = [ # This should be a secretRef - Secret('env', None, 'secret_a'), + Secret("env", None, "secret_a"), # This should be a single secret mounted in volumeMounts - Secret('volume', '/etc/foo', 'secret_b'), + Secret("volume", "/etc/foo", "secret_b"), # This should produce a single secret mounted in env - Secret('env', 'TARGET', 'secret_b', 'source_b'), + Secret("env", "TARGET", "secret_b", "source_b"), ] - self.execution_date = parser.parse('2020-08-24 00:00:00.000000') - self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date) - self.dag_id = 'dag_id' - self.task_id = 'task_id' + self.execution_date = parser.parse("2020-08-24 00:00:00.000000") + self.execution_date_label = datetime_to_label_safe_datestring( + self.execution_date + ) + self.dag_id = "dag_id" + self.task_id = "task_id" self.try_number = 3 self.labels = { - 'airflow-worker': 'uuid', - 'dag_id': 'dag_id', - 'execution_date': mock.ANY, - 'task_id': 'task_id', - 'try_number': '3', - 'airflow_version': mock.ANY, - 'kubernetes_executor': 'True' + "airflow-worker": "uuid", + "dag_id": "dag_id", + "execution_date": mock.ANY, + "task_id": "task_id", + "try_number": "3", + "airflow_version": mock.ANY, + "kubernetes_executor": "True", } self.metadata = { - 'annotations': {'dag_id': 'dag_id', - 'execution_date': '2020-08-24T00:00:00', - 'task_id': 'task_id', - 'try_number': '3'}, - 'labels': self.labels, - 'name': 'pod_id-' + self.static_uuid.hex, - 'namespace': 'namespace' + "annotations": { + "dag_id": "dag_id", + "execution_date": "2020-08-24T00:00:00", + "task_id": "task_id", + "try_number": "3", + }, + "labels": self.labels, + "name": "pod_id-" + self.static_uuid.hex, + "namespace": "namespace", } - self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi') + self.resources = Resources("1Gi", 1, "2Gi", "2Gi", 2, 1, "4Gi") self.k8s_client = ApiClient() self.expected = { - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': { - 'name': 'myapp-pod-' + self.static_uuid.hex, - 'labels': {'app': 'myapp'}, - 'namespace': 'default' + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "myapp-pod-" + self.static_uuid.hex, + "labels": {"app": "myapp"}, + "namespace": "default", }, - 'spec': { - 'containers': [{ - 'name': 'base', - 'image': 'busybox', - 'args': [], - 'command': [ - 'sh', '-c', 'echo Hello Kubernetes!' - ], - 'env': [{ - 'name': 'ENVIRONMENT', - 'value': 'prod' - }, { - 'name': 'LOG_LEVEL', - 'value': 'warning' - }, { - 'name': 'TARGET', - 'valueFrom': { - 'secretKeyRef': { - 'name': 'secret_b', - 'key': 'source_b' - } - } - }], - 'envFrom': [{ - 'configMapRef': { - 'name': 'configmap_a' - } - }, { - 'configMapRef': { - 'name': 'configmap_b' - } - }, { - 'secretRef': { - 'name': 'secret_a' - } - }], - 'resources': { - 'requests': { - 'memory': '1Gi', - 'cpu': 1, - 'ephemeral-storage': '2Gi' - }, - 'limits': { - 'memory': '2Gi', - 'cpu': 2, - 'nvidia.com/gpu': 1, - 'ephemeral-storage': '4Gi' + "spec": { + "containers": [ + { + "name": "base", + "image": "busybox", + "args": [], + "command": ["sh", "-c", "echo Hello Kubernetes!"], + "env": [ + {"name": "ENVIRONMENT", "value": "prod"}, + {"name": "LOG_LEVEL", "value": "warning"}, + { + "name": "TARGET", + "valueFrom": { + "secretKeyRef": { + "name": "secret_b", + "key": "source_b", + } + }, + }, + ], + "envFrom": [ + {"configMapRef": {"name": "configmap_a"}}, + {"configMapRef": {"name": "configmap_b"}}, + {"secretRef": {"name": "secret_a"}}, + ], + "resources": { + "requests": { + "memory": "1Gi", + "cpu": 1, + "ephemeral-storage": "2Gi", + }, + "limits": { + "memory": "2Gi", + "cpu": 2, + "nvidia.com/gpu": 1, + "ephemeral-storage": "4Gi", + }, }, - }, - 'ports': [{'name': 'foo', 'containerPort': 1234}], - 'volumeMounts': [{ - 'mountPath': '/etc/foo', - 'name': 'secretvol' + str(self.static_uuid), - 'readOnly': True - }] - }], - 'volumes': [{ - 'name': 'secretvol' + str(self.static_uuid), - 'secret': { - 'secretName': 'secret_b' + "ports": [{"name": "foo", "containerPort": 1234}], + "volumeMounts": [ + { + "mountPath": "/etc/foo", + "name": "secretvol" + str(self.static_uuid), + "readOnly": True, + } + ], } - }], - 'hostNetwork': False, - 'imagePullSecrets': [ - {'name': 'pull_secret_a'}, - {'name': 'pull_secret_b'} ], - 'securityContext': { - 'runAsUser': 1000, - 'fsGroup': 2000, + "volumes": [ + { + "name": "secretvol" + str(self.static_uuid), + "secret": {"secretName": "secret_b"}, + } + ], + "hostNetwork": False, + "imagePullSecrets": [ + {"name": "pull_secret_a"}, + {"name": "pull_secret_b"}, + ], + "securityContext": { + "runAsUser": 1000, + "fsGroup": 2000, }, - } + }, } - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_gen_pod(self, mock_uuid): mock_uuid.return_value = self.static_uuid pod_generator = PodGenerator( - labels={'app': 'myapp'}, - name='myapp-pod', - image_pull_secrets='pull_secret_a,pull_secret_b', - image='busybox', + labels={"app": "myapp"}, + name="myapp-pod", + image_pull_secrets="pull_secret_a,pull_secret_b", + image="busybox", envs=self.envs, - cmds=['sh', '-c', 'echo Hello Kubernetes!'], + cmds=["sh", "-c", "echo Hello Kubernetes!"], security_context=k8s.V1PodSecurityContext( run_as_user=1000, fs_group=2000, ), - namespace='default', - ports=[k8s.V1ContainerPort(name='foo', container_port=1234)], - configmaps=['configmap_a', 'configmap_b'] + namespace="default", + ports=[k8s.V1ContainerPort(name="foo", container_port=1234)], + configmaps=["configmap_a", "configmap_b"], ) result = pod_generator.gen_pod() result = append_to_pod(result, self.secrets) result = self.resources.attach_to_pod(result) result_dict = self.k8s_client.sanitize_for_serialization(result) # sort - result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) - result_dict['spec']['containers'][0]['envFrom'].sort( - key=lambda x: list(x.values())[0]['name'] + result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"]) + result_dict["spec"]["containers"][0]["envFrom"].sort( + key=lambda x: list(x.values())[0]["name"] ) self.assertDictEqual(self.expected, result_dict) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_gen_pod_extract_xcom(self, mock_uuid): mock_uuid.return_value = self.static_uuid pod_generator = PodGenerator( - labels={'app': 'myapp'}, - name='myapp-pod', - image_pull_secrets='pull_secret_a,pull_secret_b', - image='busybox', + labels={"app": "myapp"}, + name="myapp-pod", + image_pull_secrets="pull_secret_a,pull_secret_b", + image="busybox", envs=self.envs, - cmds=['sh', '-c', 'echo Hello Kubernetes!'], - namespace='default', + cmds=["sh", "-c", "echo Hello Kubernetes!"], + namespace="default", security_context=k8s.V1PodSecurityContext( run_as_user=1000, fs_group=2000, ), - ports=[k8s.V1ContainerPort(name='foo', container_port=1234)], - configmaps=['configmap_a', 'configmap_b'], - extract_xcom=True + ports=[k8s.V1ContainerPort(name="foo", container_port=1234)], + configmaps=["configmap_a", "configmap_b"], + extract_xcom=True, ) result = pod_generator.gen_pod() result = append_to_pod(result, self.secrets) result = self.resources.attach_to_pod(result) result_dict = self.k8s_client.sanitize_for_serialization(result) container_two = { - 'name': 'airflow-xcom-sidecar', - 'image': "alpine", - 'command': ['sh', '-c', PodDefaults.XCOM_CMD], - 'volumeMounts': [ - { - 'name': 'xcom', - 'mountPath': '/airflow/xcom' - } - ], - 'resources': {'requests': {'cpu': '1m'}}, + "name": "airflow-xcom-sidecar", + "image": "alpine", + "command": ["sh", "-c", PodDefaults.XCOM_CMD], + "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}], + "resources": {"requests": {"cpu": "1m"}}, } - self.expected['spec']['containers'].append(container_two) - self.expected['spec']['containers'][0]['volumeMounts'].insert(0, { - 'name': 'xcom', - 'mountPath': '/airflow/xcom' - }) - self.expected['spec']['volumes'].insert(0, { - 'name': 'xcom', 'emptyDir': {} - }) - result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.expected["spec"]["containers"].append(container_two) + self.expected["spec"]["containers"][0]["volumeMounts"].insert( + 0, {"name": "xcom", "mountPath": "/airflow/xcom"} + ) + self.expected["spec"]["volumes"].insert(0, {"name": "xcom", "emptyDir": {}}) + result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"]) self.assertEqual(result_dict, self.expected) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_from_obj(self, mock_uuid): mock_uuid.return_value = self.static_uuid - result = PodGenerator.from_obj({ - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - "resources": { - "requests": { - "memory": "256Mi", - "cpu": "500m", - "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" + result = PodGenerator.from_obj( + { + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + "resources": { + "requests": { + "memory": "256Mi", + "cpu": "500m", + "ephemeral-storage": "2G", + "nvidia.com/gpu": "0", + }, + "limits": { + "memory": "512Mi", + "cpu": "1000m", + "ephemeral-storage": "2G", + "nvidia.com/gpu": "0", + }, }, - "limits": { - "memory": "512Mi", - "cpu": "1000m", - "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" - } } } - }) + ) result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': { - 'annotations': {'test': 'annotation'}, + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": {"test": "annotation"}, + }, + "spec": { + "containers": [ + { + "args": [], + "command": [], + "env": [], + "envFrom": [], + "name": "base", + "ports": [], + "volumeMounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + } + ], + "resources": { + "requests": { + "memory": "256Mi", + "cpu": "500m", + "ephemeral-storage": "2G", + "nvidia.com/gpu": "0", + }, + "limits": { + "memory": "512Mi", + "cpu": "1000m", + "ephemeral-storage": "2G", + "nvidia.com/gpu": "0", + }, + }, + } + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume", + } + ], + }, }, - 'spec': { - 'containers': [{ - 'args': [], - 'command': [], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'ports': [], - 'volumeMounts': [{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume' - }], + result, + ) + + @mock.patch("uuid.uuid4") + def test_from_obj_with_resources_object(self, mock_uuid): + mock_uuid.return_value = self.static_uuid + result = PodGenerator.from_obj( + { + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], "resources": { "requests": { "memory": "256Mi", "cpu": "500m", "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" + "nvidia.com/gpu": "0", }, "limits": { "memory": "512Mi", "cpu": "1000m", "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" - } - } - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume' - }], - } - }, result) - - @mock.patch('uuid.uuid4') - def test_from_obj_with_resources_object(self, mock_uuid): - mock_uuid.return_value = self.static_uuid - result = PodGenerator.from_obj({ - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - "resources": { - "requests": { - "memory": "256Mi", - "cpu": "500m", - "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" + "nvidia.com/gpu": "0", + }, }, - "limits": { - "memory": "512Mi", - "cpu": "1000m", - "ephemeral-storage": "2G", - "nvidia.com/gpu": "0" - } } } - }) + ) result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': { - 'annotations': {'test': 'annotation'}, + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": {"test": "annotation"}, + }, + "spec": { + "containers": [ + { + "args": [], + "command": [], + "env": [], + "envFrom": [], + "name": "base", + "ports": [], + "volumeMounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + } + ], + "resources": { + "limits": { + "cpu": "1000m", + "ephemeral-storage": "2G", + "memory": "512Mi", + "nvidia.com/gpu": "0", + }, + "requests": { + "cpu": "500m", + "ephemeral-storage": "2G", + "memory": "256Mi", + "nvidia.com/gpu": "0", + }, + }, + } + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume", + } + ], + }, }, - 'spec': { - 'containers': [{ - 'args': [], - 'command': [], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'ports': [], - 'volumeMounts': [{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume' - }], - 'resources': {'limits': {'cpu': '1000m', - 'ephemeral-storage': '2G', - 'memory': '512Mi', - 'nvidia.com/gpu': '0'}, - 'requests': {'cpu': '500m', - 'ephemeral-storage': '2G', - 'memory': '256Mi', - 'nvidia.com/gpu': '0'}}, - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume' - }], - } - }, result) + result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_from_obj_with_resources(self, mock_uuid): self.maxDiff = None mock_uuid.return_value = self.static_uuid - result = PodGenerator.from_obj({ - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - 'request_cpu': "200m", - 'limit_cpu': "400m", - 'request_memory': "500Mi", - 'limit_memory': "1000Mi", - 'limit_gpu': "2", - 'request_ephemeral_storage': '2Gi', - 'limit_ephemeral_storage': '4Gi', + result = PodGenerator.from_obj( + { + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + "request_cpu": "200m", + "limit_cpu": "400m", + "request_memory": "500Mi", + "limit_memory": "1000Mi", + "limit_gpu": "2", + "request_ephemeral_storage": "2Gi", + "limit_ephemeral_storage": "4Gi", + } } - }) + ) result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': { - 'annotations': {'test': 'annotation'}, + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": {"test": "annotation"}, + }, + "spec": { + "containers": [ + { + "args": [], + "command": [], + "env": [], + "envFrom": [], + "name": "base", + "ports": [], + "resources": { + "limits": { + "cpu": "400m", + "ephemeral-storage": "4Gi", + "memory": "1000Mi", + "nvidia.com/gpu": "2", + }, + "requests": { + "cpu": "200m", + "ephemeral-storage": "2Gi", + "memory": "500Mi", + }, + }, + "volumeMounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + } + ], + } + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume", + } + ], + }, }, - 'spec': { - 'containers': [{ - 'args': [], - 'command': [], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'ports': [], - 'resources': { - 'limits': { - 'cpu': '400m', - 'ephemeral-storage': '4Gi', - 'memory': '1000Mi', - 'nvidia.com/gpu': "2", - }, - 'requests': { - 'cpu': '200m', - 'ephemeral-storage': '2Gi', - 'memory': '500Mi', - }, - }, - 'volumeMounts': [{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume' - }], - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume' - }], - } - }, result) + result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_from_obj_with_only_request_resources(self, mock_uuid): self.maxDiff = None mock_uuid.return_value = self.static_uuid - result = PodGenerator.from_obj({ - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - 'request_cpu': "200m", - 'request_memory': "500Mi", + result = PodGenerator.from_obj( + { + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + "request_cpu": "200m", + "request_memory": "500Mi", + } } - }) + ) result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': { - 'annotations': {'test': 'annotation'}, + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "annotations": {"test": "annotation"}, + }, + "spec": { + "containers": [ + { + "args": [], + "command": [], + "env": [], + "envFrom": [], + "name": "base", + "ports": [], + "resources": { + "requests": { + "cpu": "200m", + "memory": "500Mi", + }, + }, + "volumeMounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + } + ], + } + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume", + } + ], + }, }, - 'spec': { - 'containers': [{ - 'args': [], - 'command': [], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'ports': [], - 'resources': { - 'requests': { - 'cpu': '200m', - 'memory': '500Mi', - }, - }, - 'volumeMounts': [{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume' - }], - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume' - }], - } - }, result) + result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_reconcile_pods_empty_mutator_pod(self, mock_uuid): mock_uuid.return_value = self.static_uuid base_pod = PodGenerator( - image='image1', - name='name1', - envs={'key1': 'val1'}, - cmds=['/bin/command1.sh', 'arg1'], - ports=[k8s.V1ContainerPort(name='port', container_port=2118)], - volumes=[{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume1' - }], - volume_mounts=[{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume1' - }], + image="image1", + name="name1", + envs={"key1": "val1"}, + cmds=["/bin/command1.sh", "arg1"], + ports=[k8s.V1ContainerPort(name="port", container_port=2118)], + volumes=[ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume1", + } + ], + volume_mounts=[ + {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"} + ], ).gen_pod() mutator_pod = None - name = 'name1-' + self.static_uuid.hex + name = "name1-" + self.static_uuid.hex base_pod.metadata.name = name @@ -561,92 +603,107 @@ class TestPodGenerator(unittest.TestCase): result = PodGenerator.reconcile_pods(base_pod, mutator_pod) self.assertEqual(base_pod, result) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_reconcile_pods(self, mock_uuid): mock_uuid.return_value = self.static_uuid base_pod = PodGenerator( - image='image1', - name='name1', - envs={'key1': 'val1'}, - cmds=['/bin/command1.sh', 'arg1'], - ports=[k8s.V1ContainerPort(name='port', container_port=2118)], - volumes=[{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume1' - }], - volume_mounts=[{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume1' - }], + image="image1", + name="name1", + envs={"key1": "val1"}, + cmds=["/bin/command1.sh", "arg1"], + ports=[k8s.V1ContainerPort(name="port", container_port=2118)], + volumes=[ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume1", + } + ], + volume_mounts=[ + {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"} + ], ).gen_pod() mutator_pod = PodGenerator( - envs={'key2': 'val2'}, - image='', - name='name2', - cmds=['/bin/command2.sh', 'arg2'], - volumes=[{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume2' - }], - volume_mounts=[{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume2' - }] + envs={"key2": "val2"}, + image="", + name="name2", + cmds=["/bin/command2.sh", "arg2"], + volumes=[ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume2", + } + ], + volume_mounts=[ + {"mountPath": "/foo/", "name": "example-kubernetes-test-volume2"} + ], ).gen_pod() result = PodGenerator.reconcile_pods(base_pod, mutator_pod) result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': {'name': 'name2-' + self.static_uuid.hex}, - 'spec': { - 'containers': [{ - 'args': [], - 'command': ['/bin/command2.sh', 'arg2'], - 'env': [ - {'name': 'key1', 'value': 'val1'}, - {'name': 'key2', 'value': 'val2'} + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": "name2-" + self.static_uuid.hex}, + "spec": { + "containers": [ + { + "args": [], + "command": ["/bin/command2.sh", "arg2"], + "env": [ + {"name": "key1", "value": "val1"}, + {"name": "key2", "value": "val2"}, + ], + "envFrom": [], + "image": "image1", + "name": "base", + "ports": [ + { + "containerPort": 2118, + "name": "port", + } + ], + "volumeMounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume1", + }, + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume2", + }, + ], + } ], - 'envFrom': [], - 'image': 'image1', - 'name': 'base', - 'ports': [{ - 'containerPort': 2118, - 'name': 'port', - }], - 'volumeMounts': [{ - 'mountPath': '/foo/', - 'name': 'example-kubernetes-test-volume2' - }] - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [{ - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume1' - }, { - 'hostPath': {'path': '/tmp/'}, - 'name': 'example-kubernetes-test-volume2' - }] - } - }, result) + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [ + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume1", + }, + { + "hostPath": {"path": "/tmp/"}, + "name": "example-kubernetes-test-volume2", + }, + ], + }, + }, + result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_construct_pod_empty_worker_config(self, mock_uuid): mock_uuid.return_value = self.static_uuid executor_config = k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='', + name="", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '1m', - 'memory': '1G' - } - ) + limits={"cpu": "1m", "memory": "1G"} + ), ) ] ) @@ -654,61 +711,58 @@ class TestPodGenerator(unittest.TestCase): worker_config = k8s.V1Pod() result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', - 'pod_id', + "dag_id", + "task_id", + "pod_id", self.try_number, "kube_image", self.execution_date, - ['command'], + ["command"], executor_config, worker_config, - 'namespace', - 'uuid', + "namespace", + "uuid", ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': self.metadata, - 'spec': { - 'containers': [{ - 'args': [], - 'command': ['command'], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'image': 'kube_image', - 'ports': [], - 'resources': { - 'limits': { - 'cpu': '1m', - 'memory': '1G' + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": self.metadata, + "spec": { + "containers": [ + { + "args": [], + "command": ["command"], + "env": [], + "envFrom": [], + "name": "base", + "image": "kube_image", + "ports": [], + "resources": {"limits": {"cpu": "1m", "memory": "1G"}}, + "volumeMounts": [], } - }, - 'volumeMounts': [] - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [] - } - }, sanitized_result) + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [], + }, + }, + sanitized_result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_construct_pod_empty_executor_config(self, mock_uuid): mock_uuid.return_value = self.static_uuid worker_config = k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='', + name="", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '1m', - 'memory': '1G' - } - ) + limits={"cpu": "1m", "memory": "1G"} + ), ) ] ) @@ -716,225 +770,203 @@ class TestPodGenerator(unittest.TestCase): executor_config = None result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', - 'pod_id', + "dag_id", + "task_id", + "pod_id", self.try_number, "kube_image", self.execution_date, - ['command'], + ["command"], executor_config, worker_config, - 'namespace', - 'uuid', + "namespace", + "uuid", ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': self.metadata, - 'spec': { - 'containers': [{ - 'args': [], - 'command': ['command'], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'image': 'kube_image', - 'ports': [], - 'resources': { - 'limits': { - 'cpu': '1m', - 'memory': '1G' + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": self.metadata, + "spec": { + "containers": [ + { + "args": [], + "command": ["command"], + "env": [], + "envFrom": [], + "name": "base", + "image": "kube_image", + "ports": [], + "resources": {"limits": {"cpu": "1m", "memory": "1G"}}, + "volumeMounts": [], } - }, - 'volumeMounts': [] - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [] - } - }, sanitized_result) + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [], + }, + }, + sanitized_result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_construct_pod(self, mock_uuid): mock_uuid.return_value = self.static_uuid worker_config = k8s.V1Pod( metadata=k8s.V1ObjectMeta( - name='gets-overridden-by-dynamic-args', - annotations={ - 'should': 'stay' - } + name="gets-overridden-by-dynamic-args", annotations={"should": "stay"} ), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='doesnt-override', + name="doesnt-override", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '1m', - 'memory': '1G' - } + limits={"cpu": "1m", "memory": "1G"} ), - security_context=k8s.V1SecurityContext( - run_as_user=1 - ) + security_context=k8s.V1SecurityContext(run_as_user=1), ) ] - ) + ), ) executor_config = k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='doesnt-override-either', + name="doesnt-override-either", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '2m', - 'memory': '2G' - } - ) + limits={"cpu": "2m", "memory": "2G"} + ), ) ] ) ) result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', - 'pod_id', + "dag_id", + "task_id", + "pod_id", self.try_number, "kube_image", self.execution_date, - ['command'], + ["command"], executor_config, worker_config, - 'namespace', - 'uuid', + "namespace", + "uuid", ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.metadata['annotations']['should'] = 'stay' - - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': self.metadata, - 'spec': { - 'containers': [{ - 'args': [], - 'command': ['command'], - 'env': [], - 'envFrom': [], - 'image': 'kube_image', - 'name': 'base', - 'ports': [], - 'resources': { - 'limits': { - 'cpu': '2m', - 'memory': '2G' + self.metadata["annotations"]["should"] = "stay" + + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": self.metadata, + "spec": { + "containers": [ + { + "args": [], + "command": ["command"], + "env": [], + "envFrom": [], + "image": "kube_image", + "name": "base", + "ports": [], + "resources": {"limits": {"cpu": "2m", "memory": "2G"}}, + "volumeMounts": [], + "securityContext": {"runAsUser": 1}, } - }, - 'volumeMounts': [], - 'securityContext': {'runAsUser': 1} - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [] - } - }, sanitized_result) + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [], + }, + }, + sanitized_result, + ) - @mock.patch('uuid.uuid4') + @mock.patch("uuid.uuid4") def test_construct_pod_with_mutation(self, mock_uuid): mock_uuid.return_value = self.static_uuid worker_config = k8s.V1Pod( metadata=k8s.V1ObjectMeta( - name='gets-overridden-by-dynamic-args', - annotations={ - 'should': 'stay' - } + name="gets-overridden-by-dynamic-args", annotations={"should": "stay"} ), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='doesnt-override', + name="doesnt-override", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '1m', - 'memory': '1G' - } + limits={"cpu": "1m", "memory": "1G"} ), - security_context=k8s.V1SecurityContext( - run_as_user=1 - ) + security_context=k8s.V1SecurityContext(run_as_user=1), ) ] - ) + ), ) executor_config = k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ k8s.V1Container( - name='doesnt-override-either', + name="doesnt-override-either", resources=k8s.V1ResourceRequirements( - limits={ - 'cpu': '2m', - 'memory': '2G' - } - ) + limits={"cpu": "2m", "memory": "2G"} + ), ) ] ) ) result = PodGenerator.construct_pod( - dag_id='dag_id', - task_id='task_id', - pod_id='pod_id', + dag_id="dag_id", + task_id="task_id", + pod_id="pod_id", try_number=3, - kube_image='kube_image', + kube_image="kube_image", date=self.execution_date, - command=['command'], + command=["command"], pod_override_object=executor_config, base_worker_pod=worker_config, - namespace='namespace', - worker_uuid='uuid', + namespace="namespace", + worker_uuid="uuid", ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.metadata['annotations']['should'] = 'stay' - - self.assertEqual({ - 'apiVersion': 'v1', - 'kind': 'Pod', - 'metadata': self.metadata, - 'spec': { - 'containers': [{ - 'args': [], - 'command': ['command'], - 'env': [], - 'envFrom': [], - 'name': 'base', - 'image': 'kube_image', - 'ports': [], - 'resources': { - 'limits': { - 'cpu': '2m', - 'memory': '2G' + self.metadata["annotations"]["should"] = "stay" + + self.assertEqual( + { + "apiVersion": "v1", + "kind": "Pod", + "metadata": self.metadata, + "spec": { + "containers": [ + { + "args": [], + "command": ["command"], + "env": [], + "envFrom": [], + "name": "base", + "image": "kube_image", + "ports": [], + "resources": {"limits": {"cpu": "2m", "memory": "2G"}}, + "volumeMounts": [], + "securityContext": {"runAsUser": 1}, } - }, - 'volumeMounts': [], - 'securityContext': {'runAsUser': 1} - }], - 'hostNetwork': False, - 'imagePullSecrets': [], - 'volumes': [] - } - }, sanitized_result) + ], + "hostNetwork": False, + "imagePullSecrets": [], + "volumes": [], + }, + }, + sanitized_result, + ) def test_merge_objects_empty(self): - annotations = {'foo1': 'bar1'} + annotations = {"foo1": "bar1"} base_obj = k8s.V1ObjectMeta(annotations=annotations) client_obj = None res = merge_objects(base_obj, client_obj) @@ -954,57 +986,54 @@ class TestPodGenerator(unittest.TestCase): self.assertEqual(client_obj, res) def test_merge_objects(self): - base_annotations = {'foo1': 'bar1'} - base_labels = {'foo1': 'bar1'} - client_annotations = {'foo2': 'bar2'} - base_obj = k8s.V1ObjectMeta( - annotations=base_annotations, - labels=base_labels - ) + base_annotations = {"foo1": "bar1"} + base_labels = {"foo1": "bar1"} + client_annotations = {"foo2": "bar2"} + base_obj = k8s.V1ObjectMeta(annotations=base_annotations, labels=base_labels) client_obj = k8s.V1ObjectMeta(annotations=client_annotations) res = merge_objects(base_obj, client_obj) client_obj.labels = base_labels self.assertEqual(client_obj, res) def test_extend_object_field_empty(self): - ports = [k8s.V1ContainerPort(container_port=1, name='port')] - base_obj = k8s.V1Container(name='base_container', ports=ports) - client_obj = k8s.V1Container(name='client_container') - res = extend_object_field(base_obj, client_obj, 'ports') + ports = [k8s.V1ContainerPort(container_port=1, name="port")] + base_obj = k8s.V1Container(name="base_container", ports=ports) + client_obj = k8s.V1Container(name="client_container") + res = extend_object_field(base_obj, client_obj, "ports") client_obj.ports = ports self.assertEqual(client_obj, res) - base_obj = k8s.V1Container(name='base_container') - client_obj = k8s.V1Container(name='base_container', ports=ports) - res = extend_object_field(base_obj, client_obj, 'ports') + base_obj = k8s.V1Container(name="base_container") + client_obj = k8s.V1Container(name="base_container", ports=ports) + res = extend_object_field(base_obj, client_obj, "ports") self.assertEqual(client_obj, res) def test_extend_object_field_not_list(self): - base_obj = k8s.V1Container(name='base_container', image='image') - client_obj = k8s.V1Container(name='client_container') + base_obj = k8s.V1Container(name="base_container", image="image") + client_obj = k8s.V1Container(name="client_container") with self.assertRaises(ValueError): - extend_object_field(base_obj, client_obj, 'image') - base_obj = k8s.V1Container(name='base_container') - client_obj = k8s.V1Container(name='client_container', image='image') + extend_object_field(base_obj, client_obj, "image") + base_obj = k8s.V1Container(name="base_container") + client_obj = k8s.V1Container(name="client_container", image="image") with self.assertRaises(ValueError): - extend_object_field(base_obj, client_obj, 'image') + extend_object_field(base_obj, client_obj, "image") def test_extend_object_field(self): - base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] - base_obj = k8s.V1Container(name='base_container', ports=base_ports) - client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')] - client_obj = k8s.V1Container(name='client_container', ports=client_ports) - res = extend_object_field(base_obj, client_obj, 'ports') + base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")] + base_obj = k8s.V1Container(name="base_container", ports=base_ports) + client_ports = [k8s.V1ContainerPort(container_port=1, name="client_port")] + client_obj = k8s.V1Container(name="client_container", ports=client_ports) + res = extend_object_field(base_obj, client_obj, "ports") client_obj.ports = base_ports + client_ports self.assertEqual(client_obj, res) def test_reconcile_containers_empty(self): - base_objs = [k8s.V1Container(name='base_container')] + base_objs = [k8s.V1Container(name="base_container")] client_objs = [] res = PodGenerator.reconcile_containers(base_objs, client_objs) self.assertEqual(base_objs, res) - client_objs = [k8s.V1Container(name='client_container')] + client_objs = [k8s.V1Container(name="client_container")] base_objs = [] res = PodGenerator.reconcile_containers(base_objs, client_objs) self.assertEqual(client_objs, res) @@ -1013,33 +1042,33 @@ class TestPodGenerator(unittest.TestCase): self.assertEqual(res, []) def test_reconcile_containers(self): - base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] + base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")] base_objs = [ - k8s.V1Container(name='base_container1', ports=base_ports), - k8s.V1Container(name='base_container2', image='base_image'), + k8s.V1Container(name="base_container1", ports=base_ports), + k8s.V1Container(name="base_container2", image="base_image"), ] - client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')] + client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")] client_objs = [ - k8s.V1Container(name='client_container1', ports=client_ports), - k8s.V1Container(name='client_container2', image='client_image'), + k8s.V1Container(name="client_container1", ports=client_ports), + k8s.V1Container(name="client_container2", image="client_image"), ] res = PodGenerator.reconcile_containers(base_objs, client_objs) client_objs[0].ports = base_ports + client_ports self.assertEqual(client_objs, res) - base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] + base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")] base_objs = [ - k8s.V1Container(name='base_container1', ports=base_ports), - k8s.V1Container(name='base_container2', image='base_image'), + k8s.V1Container(name="base_container1", ports=base_ports), + k8s.V1Container(name="base_container2", image="base_image"), ] - client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')] + client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")] client_objs = [ - k8s.V1Container(name='client_container1', ports=client_ports), - k8s.V1Container(name='client_container2', stdin=True), + k8s.V1Container(name="client_container1", ports=client_ports), + k8s.V1Container(name="client_container2", stdin=True), ] res = PodGenerator.reconcile_containers(base_objs, client_objs) client_objs[0].ports = base_ports + client_ports - client_objs[1].image = 'base_image' + client_objs[1].image = "base_image" self.assertEqual(client_objs, res) def test_reconcile_specs_empty(self): @@ -1054,17 +1083,23 @@ class TestPodGenerator(unittest.TestCase): self.assertEqual(client_spec, res) def test_reconcile_specs(self): - base_objs = [k8s.V1Container(name='base_container1', image='base_image')] - client_objs = [k8s.V1Container(name='client_container1')] - base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs) - client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs) + base_objs = [k8s.V1Container(name="base_container1", image="base_image")] + client_objs = [k8s.V1Container(name="client_container1")] + base_spec = k8s.V1PodSpec( + priority=1, active_deadline_seconds=100, containers=base_objs + ) + client_spec = k8s.V1PodSpec( + priority=2, hostname="local", containers=client_objs + ) res = PodGenerator.reconcile_specs(base_spec, client_spec) - client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')] + client_spec.containers = [ + k8s.V1Container(name="client_container1", image="base_image") + ] client_spec.active_deadline_seconds = 100 self.assertEqual(client_spec, res) def test_deserialize_model_file(self): - fixture = sys.path[0] + '/tests/kubernetes/pod.yaml' + fixture = sys.path[0] + "/tests/kubernetes/pod.yaml" result = PodGenerator.deserialize_model_file(fixture) sanitized_res = self.k8s_client.sanitize_for_serialization(result) self.assertEqual(sanitized_res, self.deserialize_result) @@ -1107,7 +1142,11 @@ spec: command="test", pod_override_object=None, base_worker_pod=k8s.V1Pod( - metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"}, - annotations={"my.annotation": "foo"}))) + metadata=k8s.V1ObjectMeta( + labels={"airflow-test": "airflow-task-pod"}, + annotations={"my.annotation": "foo"}, + ) + ), + ) self.assertIn("airflow-test", pod.metadata.labels) self.assertIn("my.annotation", pod.metadata.annotations)
