This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4236e404ee53a3dac91826b8cd8c0dbdeb6caed6 Author: Daniel Imberman <[email protected]> AuthorDate: Tue Jun 23 11:49:51 2020 -0700 Monitor pods by labels instead of names (#6377) * Monitor k8sPodOperator pods by labels To prevent situations where the scheduler starts a second k8sPodOperator pod after a restart, we now check for existing pods using kubernetes labels * Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py Co-authored-by: Kaxil Naik <[email protected]> * Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py Co-authored-by: Kaxil Naik <[email protected]> * add docs * Update airflow/kubernetes/pod_launcher.py Co-authored-by: Kaxil Naik <[email protected]> Co-authored-by: Daniel Imberman <[email protected]> Co-authored-by: Kaxil Naik <[email protected]> (cherry picked from commit 8985df0bfcb5f2b2cd69a21b9814021f9f8ce953) --- .github/workflows/ci.yml | 2 +- .../contrib/operators/kubernetes_pod_operator.py | 300 ++++++++++++++------- airflow/executors/kubernetes_executor.py | 58 ++-- airflow/kubernetes/k8s_model.py | 3 +- airflow/kubernetes/pod_generator.py | 50 +++- airflow/kubernetes/pod_launcher.py | 40 ++- chart/charts/postgresql-6.3.12.tgz | Bin 0 -> 22754 bytes kubernetes_tests/test_kubernetes_pod_operator.py | 185 ++++++++++--- tests/executors/test_kubernetes_executor.py | 20 +- tests/kubernetes/models/test_pod.py | 6 +- tests/kubernetes/models/test_secret.py | 18 +- tests/kubernetes/test_pod_generator.py | 18 +- .../kubernetes/operators/test_kubernetes_pod.py | 45 ++-- 13 files changed, 527 insertions(+), 218 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee799dd..fb16aaf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,7 +132,7 @@ jobs: - uses: actions/checkout@master - uses: actions/setup-python@v1 with: - python-version: '3.x' + python-version: '3.6' - name: "Free space" run: ./scripts/ci/ci_free_space_on_ci.sh - name: "Build PROD image ${{ matrix.python-version }}" diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index b89a37f..8adb131 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. """Executes task in a Kubernetes POD""" -import warnings import re @@ -80,6 +79,12 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. :type cluster_context: str + :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor + :type reattach_on_restart: bool + :param labels: labels to apply to the Pod. + :type labels: dict + :param startup_timeout_seconds: timeout in seconds to startup the pod. + :type startup_timeout_seconds: int :param get_logs: get the stdout of the container as logs of the tasks. :type get_logs: bool :param annotations: non-identifying metadata you can attach to the Pod. @@ -126,90 +131,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') - def execute(self, context): - try: - client = kube_client.get_kube_client(in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file) - # Add Airflow Version to the label - # And a label to identify that pod is launched by KubernetesPodOperator - self.labels.update( - { - 'airflow_version': airflow_version.replace('+', '-'), - 'kubernetes_pod_operator': 'True', - } - ) - - pod = pod_generator.PodGenerator( - image=self.image, - namespace=self.namespace, - cmds=self.cmds, - args=self.arguments, - labels=self.labels, - name=self.name, - envs=self.env_vars, - extract_xcom=self.do_xcom_push, - image_pull_policy=self.image_pull_policy, - node_selectors=self.node_selectors, - priority_class_name=self.priority_class_name, - annotations=self.annotations, - affinity=self.affinity, - init_containers=self.init_containers, - image_pull_secrets=self.image_pull_secrets, - service_account_name=self.service_account_name, - hostnetwork=self.hostnetwork, - tolerations=self.tolerations, - configmaps=self.configmaps, - security_context=self.security_context, - dnspolicy=self.dnspolicy, - pod=self.full_pod_spec, - ).gen_pod() - - pod = append_to_pod( - pod, - self.pod_runtime_info_envs + - self.ports + - self.resources + - self.secrets + - self.volumes + - self.volume_mounts - ) - - self.pod = pod - - launcher = pod_launcher.PodLauncher(kube_client=client, - extract_xcom=self.do_xcom_push) - - try: - (final_state, result) = launcher.run_pod( - pod, - startup_timeout=self.startup_timeout_seconds, - get_logs=self.get_logs) - finally: - if self.is_delete_operator_pod: - launcher.delete_pod(pod) - - if final_state != State.SUCCESS: - raise AirflowException( - 'Pod returned a failure: {state}'.format(state=final_state) - ) - - return result - except AirflowException as ex: - raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) - - def _set_resources(self, resources): - return [Resources(**resources) if resources else Resources()] - - def _set_name(self, name): - validate_key(name, max_length=220) - return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - @apply_defaults def __init__(self, # pylint: disable=too-many-arguments,too-many-locals - namespace, - image, - name, + namespace=None, + image=None, + name=None, cmds=None, arguments=None, ports=None, @@ -220,15 +146,14 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- in_cluster=None, cluster_context=None, labels=None, + reattach_on_restart=True, startup_timeout_seconds=120, get_logs=True, image_pull_policy='IfNotPresent', annotations=None, resources=None, affinity=None, - init_containers=None, config_file=None, - do_xcom_push=False, node_selectors=None, image_pull_secrets=None, service_account_name='default', @@ -239,18 +164,19 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- security_context=None, pod_runtime_info_envs=None, dnspolicy=None, + schedulername=None, full_pod_spec=None, + init_containers=None, + log_events_on_failure=False, + do_xcom_push=False, + pod_template_file=None, priority_class_name=None, *args, **kwargs): - # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag if kwargs.get('xcom_push') is not None: - kwargs['do_xcom_push'] = kwargs.pop('xcom_push') - warnings.warn( - "`xcom_push` will be deprecated. Use `do_xcom_push` instead.", - DeprecationWarning, stacklevel=2 - ) + raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs) + self.pod = None self.do_xcom_push = do_xcom_push self.image = image @@ -259,16 +185,14 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- self.arguments = arguments or [] self.labels = labels or {} self.startup_timeout_seconds = startup_timeout_seconds - self.name = self._set_name(name) self.env_vars = env_vars or {} self.ports = ports or [] - self.init_containers = init_containers or [] - self.priority_class_name = priority_class_name self.volume_mounts = volume_mounts or [] self.volumes = volumes or [] self.secrets = secrets or [] self.in_cluster = in_cluster self.cluster_context = cluster_context + self.reattach_on_restart = reattach_on_restart self.get_logs = get_logs self.image_pull_policy = image_pull_policy self.node_selectors = node_selectors or {} @@ -285,4 +209,192 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- self.security_context = security_context or {} self.pod_runtime_info_envs = pod_runtime_info_envs or [] self.dnspolicy = dnspolicy + self.schedulername = schedulername self.full_pod_spec = full_pod_spec + self.init_containers = init_containers or [] + self.log_events_on_failure = log_events_on_failure + self.priority_class_name = priority_class_name + self.pod_template_file = pod_template_file + self.name = self._set_name(name) + + @staticmethod + def create_labels_for_pod(context): + """ + Generate labels for the pod to track the pod in case of Operator crash + + :param context: task context provided by airflow DAG + :return: dict + """ + labels = { + 'dag_id': context['dag'].dag_id, + 'task_id': context['task'].task_id, + 'execution_date': context['ts'], + 'try_number': context['ti'].try_number, + } + # In the case of sub dags this is just useful + if context['dag'].is_subdag: + labels['parent_dag_id'] = context['dag'].parent_dag.dag_id + # Ensure that label is valid for Kube, + # and if not truncate/remove invalid chars and replace with short hash. + for label_id, label in labels.items(): + safe_label = pod_generator.make_safe_label_value(str(label)) + labels[label_id] = safe_label + return labels + + def execute(self, context): + try: + if self.in_cluster is not None: + client = kube_client.get_kube_client(in_cluster=self.in_cluster, + cluster_context=self.cluster_context, + config_file=self.config_file) + else: + client = kube_client.get_kube_client(cluster_context=self.cluster_context, + config_file=self.config_file) + + # Add combination of labels to uniquely identify a running pod + labels = self.create_labels_for_pod(context) + + label_selector = self._get_pod_identifying_label_string(labels) + + pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector) + + if len(pod_list.items) > 1: + raise AirflowException( + 'More than one pod running with labels: ' + '{label_selector}'.format(label_selector=label_selector)) + + launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push) + + if len(pod_list.items) == 1 and \ + self._try_numbers_do_not_match(context, pod_list.items[0]) and \ + self.reattach_on_restart: + self.log.info("found a running pod with labels %s but a different try_number" + "Will attach to this pod and monitor instead of starting new one", labels) + final_state, _, result = self.create_new_pod_for_operator(labels, launcher) + elif len(pod_list.items) == 1: + self.log.info("found a running pod with labels %s." + "Will monitor this pod instead of starting new one", labels) + final_state, result = self.monitor_launched_pod(launcher, pod_list[0]) + else: + final_state, _, result = self.create_new_pod_for_operator(labels, launcher) + if final_state != State.SUCCESS: + raise AirflowException( + 'Pod returned a failure: {state}'.format(state=final_state)) + return result + except AirflowException as ex: + raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + + @staticmethod + def _get_pod_identifying_label_string(labels): + filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'} + return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())]) + + @staticmethod + def _try_numbers_do_not_match(context, pod): + return pod.metadata.labels['try_number'] != context['ti'].try_number + + @staticmethod + def _set_resources(resources): + if not resources: + return [] + return [Resources(**resources)] + + def _set_name(self, name): + if self.pod_template_file or self.full_pod_spec: + return None + validate_key(name, max_length=220) + return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) + + def create_new_pod_for_operator(self, labels, launcher): + """ + Creates a new pod and monitors for duration of task + + @param labels: labels used to track pod + @param launcher: pod launcher that will manage launching and monitoring pods + @return: + """ + if not (self.full_pod_spec or self.pod_template_file): + # Add Airflow Version to the label + # And a label to identify that pod is launched by KubernetesPodOperator + self.labels.update( + { + 'airflow_version': airflow_version.replace('+', '-'), + 'kubernetes_pod_operator': 'True', + } + ) + self.labels.update(labels) + pod = pod_generator.PodGenerator( + image=self.image, + namespace=self.namespace, + cmds=self.cmds, + args=self.arguments, + labels=self.labels, + name=self.name, + envs=self.env_vars, + extract_xcom=self.do_xcom_push, + image_pull_policy=self.image_pull_policy, + node_selectors=self.node_selectors, + annotations=self.annotations, + affinity=self.affinity, + image_pull_secrets=self.image_pull_secrets, + service_account_name=self.service_account_name, + hostnetwork=self.hostnetwork, + tolerations=self.tolerations, + configmaps=self.configmaps, + security_context=self.security_context, + dnspolicy=self.dnspolicy, + init_containers=self.init_containers, + restart_policy='Never', + priority_class_name=self.priority_class_name, + pod=self.full_pod_spec, + ).gen_pod() + + # noinspection PyTypeChecker + pod = append_to_pod( + pod, + self.pod_runtime_info_envs + # type: ignore + self.ports + # type: ignore + self.resources + # type: ignore + self.secrets + # type: ignore + self.volumes + # type: ignore + self.volume_mounts # type: ignore + ) + + self.pod = pod + + try: + launcher.start_pod( + pod, + startup_timeout=self.startup_timeout_seconds) + final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs) + except AirflowException: + if self.log_events_on_failure: + for event in launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise + finally: + if self.is_delete_operator_pod: + launcher.delete_pod(pod) + return final_state, pod, result + + def monitor_launched_pod(self, launcher, pod): + """ + Montitors a pod to completion that was created by a previous KubernetesPodOperator + + @param launcher: pod launcher that will manage launching and monitoring pods + :param pod: podspec used to find pod using k8s API + :return: + """ + try: + (final_state, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) + finally: + if self.is_delete_operator_pod: + launcher.delete_pod(pod) + if final_state != State.SUCCESS: + if self.log_events_on_failure: + for event in launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise AirflowException( + 'Pod returned a failure: {state}'.format(state=final_state) + ) + return final_state, result diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 8b5fdc1..d458d7a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -16,37 +16,32 @@ # under the License. """Kubernetes executor""" import base64 -import hashlib -from queue import Empty - -import re import json import multiprocessing -from uuid import uuid4 import time - -from dateutil import parser +from queue import Empty +from uuid import uuid4 import kubernetes +from dateutil import parser from kubernetes import watch, client from kubernetes.client.rest import ApiException from urllib3.exceptions import HTTPError, ReadTimeoutError +from airflow import settings from airflow.configuration import conf -from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.exceptions import AirflowConfigException, AirflowException +from airflow.executors.base_executor import BaseExecutor +from airflow.kubernetes import pod_generator from airflow.kubernetes.kube_client import get_kube_client -from airflow.kubernetes.worker_configuration import WorkerConfiguration +from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN from airflow.kubernetes.pod_generator import PodGenerator -from airflow.executors.base_executor import BaseExecutor +from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.kubernetes.worker_configuration import WorkerConfiguration from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance -from airflow.utils.state import State from airflow.utils.db import provide_session, create_session -from airflow import settings -from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin - -MAX_POD_ID_LEN = 253 -MAX_LABEL_LEN = 63 +from airflow.utils.state import State class KubeConfig: @@ -402,8 +397,8 @@ class AirflowKubernetesScheduler(LoggingMixin): namespace=self.namespace, worker_uuid=self.worker_uuid, pod_id=self._create_pod_id(dag_id, task_id), - dag_id=self._make_safe_label_value(dag_id), - task_id=self._make_safe_label_value(task_id), + dag_id=pod_generator.make_safe_label_value(dag_id), + task_id=pod_generator.make_safe_label_value(task_id), try_number=try_number, execution_date=self._datetime_to_label_safe_datestring(execution_date), airflow_command=command @@ -495,25 +490,6 @@ class AirflowKubernetesScheduler(LoggingMixin): return safe_pod_id @staticmethod - def _make_safe_label_value(string): - """ - Valid label values must be 63 characters or less and must be empty or begin and - end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), - dots (.), and alphanumerics between. - - If the label value is then greater than 63 chars once made safe, or differs in any - way from the original value sent to this function, then we need to truncate to - 53chars, and append it with a unique hash. - """ - safe_label = re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string) - - if len(safe_label) > MAX_LABEL_LEN or string != safe_label: - safe_hash = hashlib.md5(string.encode()).hexdigest()[:9] - safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash - - return safe_label - - @staticmethod def _create_pod_id(dag_id, task_id): safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars( dag_id) @@ -599,8 +575,8 @@ class AirflowKubernetesScheduler(LoggingMixin): ) for task in tasks: if ( - self._make_safe_label_value(task.dag_id) == dag_id and - self._make_safe_label_value(task.task_id) == task_id and + pod_generator.make_safe_label_value(task.dag_id) == dag_id and + pod_generator.make_safe_label_value(task.task_id) == task_id and task.execution_date == ex_time ): self.log.info( @@ -683,8 +659,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): # pylint: disable=protected-access dict_string = ( "dag_id={},task_id={},execution_date={},airflow-worker={}".format( - AirflowKubernetesScheduler._make_safe_label_value(task.dag_id), - AirflowKubernetesScheduler._make_safe_label_value(task.task_id), + pod_generator.make_safe_label_value(task.dag_id), + pod_generator.make_safe_label_value(task.task_id), AirflowKubernetesScheduler._datetime_to_label_safe_datestring( task.execution_date ), diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 7049b1d..3fd2f9e 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -58,4 +58,5 @@ def append_to_pod(pod, k8s_objects): """ if not k8s_objects: return pod - return reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod) + new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod) + return new_pod diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 2a5a0df..a614f41 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -22,9 +22,17 @@ is supported and no serialization need be written. """ import copy +import hashlib +import re +import uuid + import kubernetes.client.models as k8s + from airflow.executors import Executors -import uuid + +MAX_LABEL_LEN = 63 + +MAX_POD_ID_LEN = 253 class PodDefaults: @@ -55,6 +63,25 @@ class PodDefaults: ) +def make_safe_label_value(string): + """ + Valid label values must be 63 characters or less and must be empty or begin and + end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), + dots (.), and alphanumerics between. + + If the label value is greater than 63 chars once made safe, or differs in any + way from the original value sent to this function, then we need to truncate to + 53 chars, and append it with a unique hash. + """ + safe_label = re.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", "", string) + + if len(safe_label) > MAX_LABEL_LEN or string != safe_label: + safe_hash = hashlib.md5(string.encode()).hexdigest()[:9] + safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash + + return safe_label + + class PodGenerator: """ Contains Kubernetes Airflow Worker configuration logic @@ -130,7 +157,7 @@ class PodGenerator: # Pod Metadata self.metadata = k8s.V1ObjectMeta() self.metadata.labels = labels - self.metadata.name = name + "-" + str(uuid.uuid4())[:8] if name else None + self.metadata.name = name self.metadata.namespace = namespace self.metadata.annotations = annotations @@ -201,9 +228,28 @@ class PodGenerator: if self.extract_xcom: result = self.add_sidecar(result) + result.metadata.name = self.make_unique_pod_id(result.metadata.name) return result @staticmethod + def make_unique_pod_id(dag_id): + """ + Kubernetes pod names must be <= 253 chars and must pass the following regex for + validation + ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` + :param dag_id: a dag_id with only alphanumeric characters + :return: ``str`` valid Pod name of appropriate length + """ + if not dag_id: + return None + + safe_uuid = uuid.uuid4().hex + safe_pod_id = dag_id[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + safe_pod_id = safe_pod_id + "-" + safe_uuid + + return safe_pod_id + + @staticmethod def add_sidecar(pod): pod_cp = copy.deepcopy(pod) diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index 47d8ed5..6fb5989 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -89,13 +89,16 @@ class PodLauncher(LoggingMixin): if e.status != 404: raise - def run_pod(self, pod, startup_timeout=120, get_logs=True): + def start_pod( + self, + pod, + startup_timeout): """ Launches the pod synchronously and waits for completion. - Args: - pod (Pod): - startup_timeout (int): Timeout for startup of the pod (if pod is pending for - too long, considers task a failure + + :param pod: + :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) + :return: """ resp = self.run_pod_async(pod) curr_time = dt.now() @@ -107,9 +110,13 @@ class PodLauncher(LoggingMixin): time.sleep(1) self.log.debug('Pod not yet started') - return self._monitor_pod(pod, get_logs) - - def _monitor_pod(self, pod, get_logs): + def monitor_pod(self, pod, get_logs): + """ + :param pod: pod spec that will be monitored + :type pod : V1Pod + :param get_logs: whether to read the logs locally + :return: Tuple[State, Optional[str]] + """ if get_logs: logs = self.read_pod_logs(pod) @@ -180,6 +187,23 @@ class PodLauncher(LoggingMixin): wait=tenacity.wait_exponential(), reraise=True ) + def read_pod_events(self, pod): + """Reads events from the POD""" + try: + return self._client.list_namespaced_event( + namespace=pod.metadata.namespace, + field_selector="involvedObject.name={}".format(pod.metadata.name) + ) + except BaseHTTPError as e: + raise AirflowException( + 'There was an error reading the kubernetes API: {}'.format(e) + ) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(3), + wait=tenacity.wait_exponential(), + reraise=True + ) def read_pod(self, pod): """Read POD information""" try: diff --git a/chart/charts/postgresql-6.3.12.tgz b/chart/charts/postgresql-6.3.12.tgz new file mode 100644 index 0000000..51751d7 Binary files /dev/null and b/chart/charts/postgresql-6.3.12.tgz differ diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 814318f..ddd6dce 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -21,14 +21,12 @@ import os import shutil import unittest -from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -from tests.compat import mock, patch - import kubernetes.client.models as k8s import pendulum from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client from airflow.kubernetes.pod import Port @@ -38,9 +36,9 @@ from airflow.kubernetes.secret import Secret from airflow.kubernetes.volume import Volume from airflow.kubernetes.volume_mount import VolumeMount from airflow.models import DAG, TaskInstance - from airflow.utils import timezone from airflow.version import version as airflow_version +from tests.compat import mock, patch # noinspection DuplicatedCode @@ -74,11 +72,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): 'labels': { 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'airflow_version': airflow_version.replace('+', '-'), - # 'execution_date': '2016-01-01T0100000100-a2f50a31f', - # 'dag_id': 'dag', - # 'task_id': 'task', - # 'try_number': '1' - }, + 'execution_date': '2016-01-01T0100000100-a2f50a31f', + 'dag_id': 'dag', + 'task_id': 'task', + 'try_number': '1'}, }, 'spec': { 'affinity': {}, @@ -113,6 +110,19 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): client = kube_client.get_kube_client(in_cluster=False) client.delete_collection_namespaced_pod(namespace="default") + def create_context(self, task): + dag = DAG(dag_id="dag") + tzinfo = pendulum.timezone("Europe/Amsterdam") + execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo) + task_instance = TaskInstance(task=task, + execution_date=execution_date) + return { + "dag": dag, + "ts": execution_date.isoformat(), + "task": task, + "ti": task_instance, + } + def test_do_xcom_push_defaults_false(self): new_config_path = '/tmp/kube_config' old_config_path = os.path.expanduser('~/.kube/config') @@ -149,11 +159,98 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, config_file=new_config_path, ) - context = create_context(k) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.assertEqual(self.expected_pod, actual_pod) + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_config_path(self, client_mock, monitor_mock, start_mock): # pylint: disable=unused-argument + from airflow.utils.state import State + + file_path = "/tmp/fake_file" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + config_file=file_path, + cluster_context='default', + ) + monitor_mock.return_value = (State.SUCCESS, None) + client_mock.list_namespaced_pod.return_value = [] + context = self.create_context(k) + k.execute(context=context) + client_mock.assert_called_once_with( + in_cluster=False, + cluster_context='default', + config_file=file_path, + ) + + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start_mock): + from airflow.utils.state import State + + fake_pull_secrets = "fakeSecret" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + image_pull_secrets=fake_pull_secrets, + cluster_context='default', + ) + monitor_mock.return_value = (State.SUCCESS, None) + context = self.create_context(k) + k.execute(context=context) + self.assertEqual( + start_mock.call_args[0][0].spec.image_pull_secrets, + [k8s.V1LocalObjectReference(name=fake_pull_secrets)] + ) + + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_pod_delete_even_on_launcher_error( + self, + mock_client, + delete_pod_mock, + monitor_pod_mock, + start_pod_mock): # pylint: disable=unused-argument + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + cluster_context='default', + is_delete_operator_pod=True, + ) + monitor_pod_mock.side_effect = AirflowException('fake failure') + with self.assertRaises(AirflowException): + context = self.create_context(k) + k.execute(context=context) + assert delete_pod_mock.called + def test_working_pod(self): k = KubernetesPodOperator( namespace='default', @@ -185,7 +282,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, is_delete_operator_pod=True, ) - context = create_context(k) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) @@ -204,7 +301,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, hostnetwork=True, ) - context = create_context(k) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['hostNetwork'] = True @@ -226,7 +323,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): hostnetwork=True, dnspolicy=dns_policy ) - context = create_context(k) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['hostNetwork'] = True @@ -234,6 +331,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels']) + def test_pod_schedulername(self): + scheduler_name = "default-scheduler" + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + schedulername=scheduler_name + ) + context = self.create_context(k) + k.execute(context) + actual_pod = self.api_client.sanitize_for_serialization(k.pod) + self.expected_pod['spec']['schedulerName'] = scheduler_name + self.assertEqual(self.expected_pod, actual_pod) + self.assertEqual(self.expected_pod['spec'], actual_pod['spec']) + self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels']) + def test_pod_node_selectors(self): node_selectors = { 'beta.kubernetes.io/os': 'linux' @@ -275,7 +394,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, resources=resources, ) - context = create_context(k) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['containers'][0]['resources'] = { @@ -342,7 +461,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, ports=[port], ) - context = create_context(k) + context = self.create_context(k) k.execute(context=context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['containers'][0]['ports'] = [{ @@ -564,9 +683,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): self.expected_pod['spec']['containers'].append(container) self.assertEqual(self.expected_pod, actual_pod) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") - @patch("airflow.kubernetes.kube_client.get_kube_client") - def test_envs_from_configmaps(self, mock_client, mock_run): + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start): # GIVEN from airflow.utils.state import State @@ -585,19 +705,20 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): configmaps=[configmap], ) # THEN - mock_run.return_value = (State.SUCCESS, None) - context = create_context(k) + mock_monitor.return_value = (State.SUCCESS, None) + context = self.create_context(k) k.execute(context) self.assertEqual( - mock_run.call_args[0][0].spec.containers[0].env_from, + mock_start.call_args[0][0].spec.containers[0].env_from, [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource( name=configmap ))] ) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") - @patch("airflow.kubernetes.kube_client.get_kube_client") - def test_envs_from_secrets(self, mock_client, mock_run): + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.kube_client.get_kube_client") + def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): # GIVEN from airflow.utils.state import State secret_ref = 'secret_name' @@ -616,11 +737,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, ) # THEN - mock_run.return_value = (State.SUCCESS, None) - context = create_context(k) + monitor_mock.return_value = (State.SUCCESS, None) + context = self.create_context(k) k.execute(context) self.assertEqual( - mock_run.call_args[0][0].spec.containers[0].env_from, + start_mock.call_args[0][0].spec.containers[0].env_from, [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource( name=secret_ref ))] @@ -704,12 +825,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): }] self.assertEqual(self.expected_pod, actual_pod) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") @patch("airflow.kubernetes.kube_client.get_kube_client") def test_pod_priority_class_name( self, mock_client, - run_mock): # pylint: disable=unused-argument + monitor_mock, + start_mock): # pylint: disable=unused-argument """Test ability to assign priorityClassName to pod """ @@ -729,8 +852,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): priority_class_name=priority_class_name, ) - run_mock.return_value = (State.SUCCESS, None) - context = create_context(k) + monitor_mock.return_value = (State.SUCCESS, None) + context = self.create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod['spec']['priorityClassName'] = priority_class_name diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 77299f6..993c47a 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -33,6 +33,8 @@ try: from airflow.configuration import conf # noqa: F401 from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler from airflow.executors.kubernetes_executor import KubernetesExecutor + from airflow.kubernetes import pod_generator + from airflow.kubernetes.pod_generator import PodGenerator from airflow.utils.state import State except ImportError: AirflowKubernetesScheduler = None # type: ignore @@ -87,24 +89,26 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): 'kubernetes python package is not installed') def test_create_pod_id(self): for dag_id, task_id in self._cases(): - pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id) + pod_name = PodGenerator.make_unique_pod_id( + AirflowKubernetesScheduler._create_pod_id(dag_id, task_id) + ) self.assertTrue(self._is_valid_pod_id(pod_name)) def test_make_safe_label_value(self): for dag_id, task_id in self._cases(): - safe_dag_id = AirflowKubernetesScheduler._make_safe_label_value(dag_id) + safe_dag_id = pod_generator.make_safe_label_value(dag_id) self.assertTrue(self._is_safe_label_value(safe_dag_id)) - safe_task_id = AirflowKubernetesScheduler._make_safe_label_value(task_id) + safe_task_id = pod_generator.make_safe_label_value(task_id) self.assertTrue(self._is_safe_label_value(safe_task_id)) - id = "my_dag_id" + dag_id = "my_dag_id" self.assertEqual( - id, - AirflowKubernetesScheduler._make_safe_label_value(id) + dag_id, + pod_generator.make_safe_label_value(dag_id) ) - id = "my_dag_id_" + "a" * 64 + dag_id = "my_dag_id_" + "a" * 64 self.assertEqual( "my_dag_id_" + "a" * 43 + "-0ce114c45", - AirflowKubernetesScheduler._make_safe_label_value(id) + pod_generator.make_safe_label_value(dag_id) ) @unittest.skipIf(AirflowKubernetesScheduler is None, diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py index edcd364..b63af6d 100644 --- a/tests/kubernetes/models/test_pod.py +++ b/tests/kubernetes/models/test_pod.py @@ -37,7 +37,9 @@ class TestPod(unittest.TestCase): @mock.patch('uuid.uuid4') def test_port_attach_to_pod(self, mock_uuid): - mock_uuid.return_value = '0' + import uuid + static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') + mock_uuid.return_value = static_uuid pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod() ports = [ Port('https', 443), @@ -49,7 +51,7 @@ class TestPod(unittest.TestCase): self.assertEqual({ 'apiVersion': 'v1', 'kind': 'Pod', - 'metadata': {'name': 'base-0'}, + 'metadata': {'name': 'base-' + static_uuid.hex}, 'spec': { 'containers': [{ 'args': [], diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py index 843bd79..44ab8b3 100644 --- a/tests/kubernetes/models/test_secret.py +++ b/tests/kubernetes/models/test_secret.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. import unittest +import uuid + from tests.compat import mock from kubernetes.client import ApiClient import kubernetes.client.models as k8s @@ -45,25 +47,27 @@ class TestSecret(unittest.TestCase): @mock.patch('uuid.uuid4') def test_to_volume_secret(self, mock_uuid): - mock_uuid.return_value = '0' + static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') + mock_uuid.return_value = static_uuid secret = Secret('volume', '/etc/foo', 'secret_b') self.assertEqual(secret.to_volume_secret(), ( k8s.V1Volume( - name='secretvol0', + name='secretvol' + str(static_uuid), secret=k8s.V1SecretVolumeSource( secret_name='secret_b' ) ), k8s.V1VolumeMount( mount_path='/etc/foo', - name='secretvol0', + name='secretvol' + str(static_uuid), read_only=True ) )) @mock.patch('uuid.uuid4') def test_attach_to_pod(self, mock_uuid): - mock_uuid.return_value = '0' + static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') + mock_uuid.return_value = static_uuid pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod() secrets = [ @@ -80,7 +84,7 @@ class TestSecret(unittest.TestCase): self.assertEqual(result, { 'apiVersion': 'v1', 'kind': 'Pod', - 'metadata': {'name': 'base-0'}, + 'metadata': {'name': 'base-' + static_uuid.hex}, 'spec': { 'containers': [{ 'args': [], @@ -101,14 +105,14 @@ class TestSecret(unittest.TestCase): 'ports': [], 'volumeMounts': [{ 'mountPath': '/etc/foo', - 'name': 'secretvol0', + 'name': 'secretvol' + str(static_uuid), 'readOnly': True}] }], 'hostNetwork': False, 'imagePullSecrets': [], 'restartPolicy': 'Never', 'volumes': [{ - 'name': 'secretvol0', + 'name': 'secretvol' + str(static_uuid), 'secret': {'secretName': 'secret_b'} }] } diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index 4caf24b..30839e7 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -17,6 +17,7 @@ import unittest from tests.compat import mock +import uuid import kubernetes.client.models as k8s from kubernetes.client import ApiClient from airflow.kubernetes.secret import Secret @@ -42,11 +43,12 @@ class TestPodGenerator(unittest.TestCase): ] self.resources = Resources('1Gi', 1, '2Gi', 2, 1) self.k8s_client = ApiClient() + self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') self.expected = { 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': { - 'name': 'myapp-pod-0', + 'name': 'myapp-pod-' + self.static_uuid.hex, 'labels': {'app': 'myapp'}, 'namespace': 'default' }, @@ -101,13 +103,13 @@ class TestPodGenerator(unittest.TestCase): 'ports': [{'name': 'foo', 'containerPort': 1234}], 'volumeMounts': [{ 'mountPath': '/etc/foo', - 'name': 'secretvol0', + 'name': 'secretvol' + str(self.static_uuid), 'readOnly': True }] }], 'restartPolicy': 'Never', 'volumes': [{ - 'name': 'secretvol0', + 'name': 'secretvol' + str(self.static_uuid), 'secret': { 'secretName': 'secret_b' } @@ -126,7 +128,7 @@ class TestPodGenerator(unittest.TestCase): @mock.patch('uuid.uuid4') def test_gen_pod(self, mock_uuid): - mock_uuid.return_value = '0' + mock_uuid.return_value = self.static_uuid pod_generator = PodGenerator( labels={'app': 'myapp'}, name='myapp-pod', @@ -155,7 +157,7 @@ class TestPodGenerator(unittest.TestCase): @mock.patch('uuid.uuid4') def test_gen_pod_extract_xcom(self, mock_uuid): - mock_uuid.return_value = '0' + mock_uuid.return_value = self.static_uuid pod_generator = PodGenerator( labels={'app': 'myapp'}, name='myapp-pod', @@ -201,7 +203,7 @@ class TestPodGenerator(unittest.TestCase): @mock.patch('uuid.uuid4') def test_from_obj(self, mock_uuid): - mock_uuid.return_value = '0' + mock_uuid.return_value = self.static_uuid result = PodGenerator.from_obj({ "KubernetesExecutor": { "annotations": {"test": "annotation"}, @@ -253,7 +255,7 @@ class TestPodGenerator(unittest.TestCase): def test_reconcile_pods(self): with mock.patch('uuid.uuid4') as mock_uuid: - mock_uuid.return_value = '0' + mock_uuid.return_value = self.static_uuid base_pod = PodGenerator( image='image1', name='name1', @@ -290,7 +292,7 @@ class TestPodGenerator(unittest.TestCase): self.assertEqual(result, { 'apiVersion': 'v1', 'kind': 'Pod', - 'metadata': {'name': 'name2-0'}, + 'metadata': {'name': 'name2-' + self.static_uuid.hex}, 'spec': { 'containers': [{ 'args': [], diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 0531dc0..f20ac2d 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -46,9 +46,15 @@ class TestKubernetesPodOperator(unittest.TestCase): "ti": task_instance, } - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_config_path(self, client_mock, run_mock): # pylint: disable=unused-argument + def test_config_path( + self, + mock_client, + start_pod_mock, + monitor_pod_mock + ): from airflow.utils.state import State file_path = "/tmp/fake_file" @@ -65,19 +71,25 @@ class TestKubernetesPodOperator(unittest.TestCase): config_file=file_path, cluster_context='default', ) - run_mock.return_value = (State.SUCCESS, None) - client_mock.list_namespaced_pod.return_value = [] + monitor_pod_mock.return_value = (State.SUCCESS, None) + mock_client.list_namespaced_pod.return_value = [] context = self.create_context(k) k.execute(context=context) - client_mock.assert_called_once_with( + mock_client.assert_called_once_with( in_cluster=False, cluster_context='default', config_file=file_path, ) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_image_pull_secrets_correctly_set(self, mock_client, run_mock): + def test_image_pull_secrets_correctly_set( + self, + mock_client, + start_pod_mock, + monitor_pod_mock + ): from airflow.utils.state import State fake_pull_secrets = "fakeSecret" @@ -94,22 +106,25 @@ class TestKubernetesPodOperator(unittest.TestCase): image_pull_secrets=fake_pull_secrets, cluster_context='default', ) - run_mock.return_value = (State.SUCCESS, None) + monitor_pod_mock.return_value = (State.SUCCESS, None) context = self.create_context(k) k.execute(context=context) self.assertEqual( - run_mock.call_args[0][0].spec.image_pull_secrets, + start_pod_mock.call_args[0][0].spec.image_pull_secrets, [k8s.V1LocalObjectReference(name=fake_pull_secrets)] ) - @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod") @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_pod_delete_even_on_launcher_error( - self, - mock_client, - delete_pod_mock, - run_pod_mock): + self, + mock_client, + delete_pod_mock, + start_pod_mock, + monitor_pod_mock + ): k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", @@ -123,7 +138,7 @@ class TestKubernetesPodOperator(unittest.TestCase): cluster_context='default', is_delete_operator_pod=True, ) - run_pod_mock.side_effect = AirflowException('fake failure') + start_pod_mock.side_effect = AirflowException('fake failure') with self.assertRaises(AirflowException): context = self.create_context(k) k.execute(context=context)
