kaxil commented on a change in pull request #12384: URL: https://github.com/apache/airflow/pull/12384#discussion_r525225682
########## File path: kubernetes_tests/test_kubernetes_pod_operator_backcompat.py ########## @@ -0,0 +1,948 @@ +# pylint: disable=unused-argument +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import logging +import sys +import textwrap +import unittest +from unittest import mock +from unittest.mock import patch + +import kubernetes.client.models as k8s +import pendulum +from kubernetes.client.api_client import ApiClient +from kubernetes.client.rest import ApiException + +from airflow.exceptions import AirflowException +from airflow.kubernetes import kube_client +from airflow.kubernetes.pod import Port +from airflow.kubernetes.pod_generator import PodDefaults +from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv +from airflow.kubernetes.secret import Secret +from airflow.kubernetes.volume import Volume +from airflow.kubernetes.volume_mount import VolumeMount +from airflow.models import DAG, TaskInstance +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + + +# noinspection DuplicatedCode +def create_context(task): + dag = DAG(dag_id="dag") + tzinfo = pendulum.timezone("Europe/Amsterdam") + execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo) + task_instance = TaskInstance(task=task, execution_date=execution_date) + return { + "dag": dag, + "ts": execution_date.isoformat(), + "task": task, + "ti": task_instance, + } + + +# noinspection DuplicatedCode,PyUnusedLocal +class TestKubernetesPodOperatorSystem(unittest.TestCase): + def get_current_task_name(self): + # reverse test name to make pod name unique (it has limited length) + return "_" + unittest.TestCase.id(self).replace(".", "_")[::-1] + + def setUp(self): + self.maxDiff = None # pylint: disable=invalid-name + self.api_client = ApiClient() + self.expected_pod = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'namespace': 'default', + 'name': mock.ANY, + 'annotations': {}, + 'labels': { + 'foo': 'bar', + 'kubernetes_pod_operator': 'True', + 'airflow_version': airflow_version.replace('+', '-'), + 'execution_date': '2016-01-01T0100000100-a2f50a31f', + 'dag_id': 'dag', + 'task_id': 'task', + 'try_number': '1', + }, + }, + 'spec': { + 'affinity': {}, + 'containers': [ + { + 'image': 'ubuntu:16.04', + 'args': ["echo 10"], + 'command': ["bash", "-cx"], + 'env': [], + 'envFrom': [], + 'resources': {}, + 'name': 'base', + 'ports': [], + 'volumeMounts': [], + } + ], + 'hostNetwork': False, + 'imagePullSecrets': [], + 'initContainers': [], + 'nodeSelector': {}, + 'restartPolicy': 'Never', + 'securityContext': {}, + 'serviceAccountName': 'default', + 'tolerations': [], + 'volumes': [], + }, + } + + def tearDown(self): + client = kube_client.get_kube_client(in_cluster=False) + client.delete_collection_namespaced_pod(namespace="default") + + def create_context(self, task): + dag = DAG(dag_id="dag") + tzinfo = pendulum.timezone("Europe/Amsterdam") + execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo) + task_instance = TaskInstance(task=task, execution_date=execution_date) + return { + "dag": dag, + "ts": execution_date.isoformat(), + "task": task, + "ti": task_instance, + } + + def test_do_xcom_push_defaults_false(self): + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + ) + self.assertFalse(k.do_xcom_push) + + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_config_path(self, client_mock, monitor_mock, start_mock): # pylint: disable=unused-argument Review comment: This test does not test any old behaviour, does it? i.e. it is just dupe of same test (`test_config_path`) in tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py Can we remove all such tests that are not strictly checking the old behaviour too please? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
