This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd937e51fe Add `on_finish_action` to `KubernetesPodOperator` (#30718)
dd937e51fe is described below
commit dd937e51fe1ae3cd36a6993bd42e425960644e1d
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Jun 30 10:01:13 2023 +0200
Add `on_finish_action` to `KubernetesPodOperator` (#30718)
* Add a new arg for KPO to only delete the pod when it doesn't fail
* deprecate is_delete_operator_pod and add on_finish_action
* Add deprecated properties and fix unit tests
* add missing attribute
* Apply suggestions from code review
Co-authored-by: Jed Cunningham
<[email protected]>
* update GKEStartPodOperator to be consistent with KPO
* update EksPodOperator to be consistent with KPO
* update unit tests and the method used to check the kpo compatibility
* Fix a bug and add a new unit test for each provider
* warn with AirflowProviderDeprecationWarning instead of DeprecationWarning
* Bump KPO min version in GCP provider and add a new one to AWS provider
* Add the new param to the GKE trigger
* Apply suggestions from code review
Co-authored-by: Jarek Potiuk <[email protected]>
---------
Co-authored-by: Jed Cunningham
<[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow/providers/amazon/aws/operators/eks.py | 34 ++++++---
airflow/providers/amazon/provider.yaml | 3 +
airflow/providers/cncf/kubernetes/operators/pod.py | 43 ++++++++---
airflow/providers/cncf/kubernetes/triggers/pod.py | 34 +++++++--
.../providers/cncf/kubernetes/utils/pod_manager.py | 9 +++
.../google/cloud/operators/kubernetes_engine.py | 38 +++++++---
.../google/cloud/triggers/kubernetes_engine.py | 34 +++++++--
airflow/providers/google/provider.yaml | 2 +-
tests/providers/amazon/aws/operators/test_eks.py | 58 ++++++++++++++-
.../cncf/kubernetes/operators/test_pod.py | 84 ++++++++++++++++++----
.../providers/cncf/kubernetes/triggers/test_pod.py | 74 +++++++++++++++++--
.../cloud/operators/test_kubernetes_engine.py | 61 ++++++++++++++++
.../cloud/triggers/test_kubernetes_engine.py | 4 +-
.../providers/amazon/aws/example_eks_templated.py | 2 +-
.../aws/example_eks_with_fargate_in_one_step.py | 2 +-
.../amazon/aws/example_eks_with_fargate_profile.py | 2 +-
.../aws/example_eks_with_nodegroup_in_one_step.py | 2 +-
.../cncf/kubernetes/example_kubernetes.py | 6 +-
.../cncf/kubernetes/example_kubernetes_async.py | 6 +-
.../kubernetes_engine/example_kubernetes_engine.py | 4 +-
.../example_kubernetes_engine_async.py | 4 +-
21 files changed, 430 insertions(+), 76 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/eks.py
b/airflow/providers/amazon/aws/operators/eks.py
index 9c27e88350..bea4223987 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -34,6 +34,7 @@ from airflow.providers.amazon.aws.triggers.eks import (
EksNodegroupTrigger,
)
from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
try:
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
@@ -854,10 +855,15 @@ class EksPodOperator(KubernetesPodOperator):
running Airflow in a distributed manner and aws_conn_id is None or
empty, then the default boto3 configuration would be used (and must be
maintained on each worker node).
+ :param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
+ If "delete_pod", the pod will be deleted regardless it's state; if
"delete_succeeded_pod",
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
+ Current default is `keep_pod`, but this will be changed in the next
major release of this provider.
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True, delete the
- pod; if False, leave the pod. Current default is False, but this will
be
+ pod; if False, leave the pod. Current default is False, but this will
be
changed in the next major release of this provider.
+ Deprecated - use `on_finish_action` instead.
"""
@@ -885,19 +891,32 @@ class EksPodOperator(KubernetesPodOperator):
pod_username: str | None = None,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ on_finish_action: str | None = None,
is_delete_operator_pod: bool | None = None,
**kwargs,
) -> None:
- if is_delete_operator_pod is None:
+ if is_delete_operator_pod is not None:
warnings.warn(
- f"You have not set parameter `is_delete_operator_pod` in class
{self.__class__.__name__}. "
- "Currently the default for this parameter is `False` but in a
future release the default "
- "will be changed to `True`. To ensure pods are not deleted in
the future you will need to "
- "set `is_delete_operator_pod=False` explicitly.",
+ "`is_delete_operator_pod` parameter is deprecated, please use
`on_finish_action`",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- is_delete_operator_pod = False
+ kwargs["on_finish_action"] = (
+ OnFinishAction.DELETE_POD if is_delete_operator_pod else
OnFinishAction.KEEP_POD
+ )
+ else:
+ if on_finish_action is not None:
+ kwargs["on_finish_action"] = OnFinishAction(on_finish_action)
+ else:
+ warnings.warn(
+ f"You have not set parameter `on_finish_action` in class
{self.__class__.__name__}. "
+ "Currently the default for this parameter is `keep_pod`
but in a future release"
+ " the default will be changed to `delete_pod`. To ensure
pods are not deleted in"
+ " the future you will need to set
`on_finish_action=keep_pod` explicitly.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
self.cluster_name = cluster_name
self.in_cluster = in_cluster
@@ -909,7 +928,6 @@ class EksPodOperator(KubernetesPodOperator):
in_cluster=self.in_cluster,
namespace=self.namespace,
name=self.pod_name,
- is_delete_operator_pod=is_delete_operator_pod,
**kwargs,
)
# There is no need to manage the kube_config file, as it will be
generated automatically.
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 0c74094e81..a7dac9fad4 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -667,3 +667,6 @@ additional-extras:
- name: aiobotocore
dependencies:
- aiobotocore[boto3]>=2.2.0
+ - name: cncf.kubernetes
+ dependencies:
+ - apache-airflow-providers-cncf-kubernetes>=7.2.0
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 87bfc41301..696611c6c2 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -23,6 +23,7 @@ import logging
import re
import secrets
import string
+import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from functools import cached_property
@@ -32,7 +33,7 @@ from kubernetes.client import CoreV1Api, models as k8s
from slugify import slugify
from urllib3.exceptions import HTTPError
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
@@ -52,6 +53,7 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes
import KubernetesHook
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type:
ignore[attr-defined]
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+ OnFinishAction,
PodLaunchFailedException,
PodManager,
PodOperatorHookProtocol,
@@ -188,9 +190,6 @@ class KubernetesPodOperator(BaseOperator):
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:param service_account_name: Name of the service account
- :param is_delete_operator_pod: What to do when the pod reaches its final
- state, or the execution is interrupted. If True (default), delete the
- pod; if False, leave the pod.
:param hostnetwork: If True enable host networking on the pod.
:param tolerations: A list of kubernetes tolerations.
:param security_context: security options the pod should run with
(PodSecurityContext).
@@ -226,6 +225,13 @@ class KubernetesPodOperator(BaseOperator):
:param deferrable: Run operator in the deferrable mode.
:param poll_interval: Polling period in seconds to check for the status.
Used only in deferrable mode.
:param log_pod_spec_on_failure: Log the pod's specification if a failure
occurs
+ :param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
+ If "delete_pod", the pod will be deleted regardless it's state; if
"delete_succeeded_pod",
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
+ :param is_delete_operator_pod: What to do when the pod reaches its final
+ state, or the execution is interrupted. If True (default), delete the
+ pod; if False, leave the pod.
+ Deprecated - use `on_finish_action` instead.
"""
# This field can be overloaded at the instance level via
base_container_name
@@ -279,7 +285,6 @@ class KubernetesPodOperator(BaseOperator):
node_selector: dict | None = None,
image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
service_account_name: str | None = None,
- is_delete_operator_pod: bool = True,
hostnetwork: bool = False,
tolerations: list[k8s.V1Toleration] | None = None,
security_context: dict | None = None,
@@ -303,6 +308,8 @@ class KubernetesPodOperator(BaseOperator):
deferrable: bool = False,
poll_interval: float = 2,
log_pod_spec_on_failure: bool = True,
+ on_finish_action: str = "delete_pod",
+ is_delete_operator_pod: None | bool = None,
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to
advise users to switch to the
@@ -350,7 +357,6 @@ class KubernetesPodOperator(BaseOperator):
self.config_file = config_file
self.image_pull_secrets =
convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
self.service_account_name = service_account_name
- self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
self.tolerations = (
[convert_toleration(toleration) for toleration in tolerations] if
tolerations else []
@@ -384,6 +390,20 @@ class KubernetesPodOperator(BaseOperator):
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
self.log_pod_spec_on_failure = log_pod_spec_on_failure
+ if is_delete_operator_pod is not None:
+ warnings.warn(
+ "`is_delete_operator_pod` parameter is deprecated, please use
`on_finish_action`",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ self.on_finish_action = (
+ OnFinishAction.DELETE_POD if is_delete_operator_pod else
OnFinishAction.KEEP_POD
+ )
+ self.is_delete_operator_pod = is_delete_operator_pod
+ else:
+ self.on_finish_action = OnFinishAction(on_finish_action)
+ self.is_delete_operator_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+
self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
@cached_property
@@ -595,10 +615,10 @@ class KubernetesPodOperator(BaseOperator):
config_file=self.config_file,
in_cluster=self.in_cluster,
poll_interval=self.poll_interval,
- should_delete_pod=self.is_delete_operator_pod,
get_logs=self.get_logs,
startup_timeout=self.startup_timeout_seconds,
base_container_name=self.base_container_name,
+ on_finish_action=self.on_finish_action.value,
),
method_name="execute_complete",
)
@@ -669,7 +689,8 @@ class KubernetesPodOperator(BaseOperator):
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status")
else None
- if pod_phase != PodPhase.SUCCEEDED or not self.is_delete_operator_pod:
+ # if the pod fails or success, but we don't want to delete it
+ if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action ==
OnFinishAction.KEEP_POD:
self.patch_already_checked(remote_pod, reraise=False)
if pod_phase != PodPhase.SUCCEEDED:
@@ -722,7 +743,11 @@ class KubernetesPodOperator(BaseOperator):
def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
with _optionally_suppress(reraise=reraise):
if pod is not None:
- if self.is_delete_operator_pod:
+ should_delete_pod = (self.on_finish_action ==
OnFinishAction.DELETE_POD) or (
+ self.on_finish_action ==
OnFinishAction.DELETE_SUCCEEDED_POD
+ and pod.status.phase == PodPhase.SUCCEEDED
+ )
+ if should_delete_pod:
self.log.info("Deleting pod: %s", pod.metadata.name)
self.pod_manager.delete_pod(pod)
else:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 6cfb6c523a..6fdf763ece 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import asyncio
+import warnings
from asyncio import CancelledError
from datetime import datetime
from enum import Enum
@@ -25,8 +26,9 @@ from typing import Any, AsyncIterator
import pytz
from kubernetes_asyncio.client.models import V1Pod
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
+from airflow.providers.cncf.kubernetes.utils.pod_manager import
OnFinishAction, PodPhase
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -57,11 +59,15 @@ class KubernetesPodTrigger(BaseTrigger):
:param poll_interval: Polling period in seconds to check for the status.
:param trigger_start_time: time in Datetime format when the trigger was
started
:param in_cluster: run kubernetes client with in_cluster configuration.
+ :param get_logs: get the stdout of the container as logs of the tasks.
+ :param startup_timeout: timeout in seconds to start up the pod.
+ :param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
+ If "delete_pod", the pod will be deleted regardless it's state; if
"delete_succeeded_pod",
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
:param should_delete_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
- :param get_logs: get the stdout of the container as logs of the tasks.
- :param startup_timeout: timeout in seconds to start up the pod.
+ Deprecated - use `on_finish_action` instead.
"""
def __init__(
@@ -75,9 +81,10 @@ class KubernetesPodTrigger(BaseTrigger):
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
- should_delete_pod: bool = True,
get_logs: bool = True,
startup_timeout: int = 120,
+ on_finish_action: str = "delete_pod",
+ should_delete_pod: bool | None = None,
):
super().__init__()
self.pod_name = pod_name
@@ -89,10 +96,22 @@ class KubernetesPodTrigger(BaseTrigger):
self.cluster_context = cluster_context
self.config_file = config_file
self.in_cluster = in_cluster
- self.should_delete_pod = should_delete_pod
self.get_logs = get_logs
self.startup_timeout = startup_timeout
+ if should_delete_pod is not None:
+ warnings.warn(
+ "`should_delete_pod` parameter is deprecated, please use
`on_finish_action`",
+ AirflowProviderDeprecationWarning,
+ )
+ self.on_finish_action = (
+ OnFinishAction.DELETE_POD if should_delete_pod else
OnFinishAction.KEEP_POD
+ )
+ self.should_delete_pod = should_delete_pod
+ else:
+ self.on_finish_action = OnFinishAction(on_finish_action)
+ self.should_delete_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+
self._hook: AsyncKubernetesHook | None = None
self._since_time = None
@@ -109,10 +128,11 @@ class KubernetesPodTrigger(BaseTrigger):
"cluster_context": self.cluster_context,
"config_file": self.config_file,
"in_cluster": self.in_cluster,
- "should_delete_pod": self.should_delete_pod,
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
"trigger_start_time": self.trigger_start_time,
+ "should_delete_pod": self.should_delete_pod,
+ "on_finish_action": self.on_finish_action.value,
},
)
@@ -191,7 +211,7 @@ class KubernetesPodTrigger(BaseTrigger):
name=self.pod_name,
namespace=self.pod_namespace,
)
- if self.should_delete_pod:
+ if self.on_finish_action == OnFinishAction.DELETE_POD:
self.log.info("Deleting pod...")
await self._get_async_hook().delete_pod(
name=self.pod_name,
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3bacb95f4f..71b5e17127 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -17,6 +17,7 @@
"""Launches PODs."""
from __future__ import annotations
+import enum
import json
import logging
import math
@@ -585,3 +586,11 @@ class PodManager(LoggingMixin):
if res:
return res
return res
+
+
+class OnFinishAction(enum.Enum):
+ """Action to take when the pod finishes."""
+
+ KEEP_POD = "keep_pod"
+ DELETE_POD = "delete_pod"
+ DELETE_SUCCEEDED_POD = "delete_succeeded_pod"
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 55971f52b7..bf14828d87 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -27,6 +27,7 @@ from google.cloud.container_v1.types import Cluster
from kubernetes.client.models import V1Pod
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
try:
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
@@ -45,7 +46,6 @@ from airflow.utils.timezone import utcnow
if TYPE_CHECKING:
from airflow.utils.context import Context
-
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
@@ -427,11 +427,16 @@ class GKEStartPodOperator(KubernetesPodOperator):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param regional: The location param is region name.
+ :param deferrable: Run operator in the deferrable mode.
+ :param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
+ If "delete_pod", the pod will be deleted regardless it's state; if
"delete_succeeded_pod",
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
+ Current default is `keep_pod`, but this will be changed in the next
major release of this provider.
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True, delete the
pod; if False, leave the pod. Current default is False, but this will
be
changed in the next major release of this provider.
- :param deferrable: Run operator in the deferrable mode.
+ Deprecated - use `on_finish_action` instead.
"""
template_fields: Sequence[str] = tuple(
@@ -449,19 +454,32 @@ class GKEStartPodOperator(KubernetesPodOperator):
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
regional: bool | None = None,
+ on_finish_action: str | None = None,
is_delete_operator_pod: bool | None = None,
**kwargs,
) -> None:
- if is_delete_operator_pod is None:
+ if is_delete_operator_pod is not None:
warnings.warn(
- f"You have not set parameter `is_delete_operator_pod` in class
{self.__class__.__name__}. "
- "Currently the default for this parameter is `False` but in a
future release the default "
- "will be changed to `True`. To ensure pods are not deleted in
the future you will need to "
- "set `is_delete_operator_pod=False` explicitly.",
+ "`is_delete_operator_pod` parameter is deprecated, please use
`on_finish_action`",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- is_delete_operator_pod = False
+ kwargs["on_finish_action"] = (
+ OnFinishAction.DELETE_POD if is_delete_operator_pod else
OnFinishAction.KEEP_POD
+ )
+ else:
+ if on_finish_action is not None:
+ kwargs["on_finish_action"] = OnFinishAction(on_finish_action)
+ else:
+ warnings.warn(
+ f"You have not set parameter `on_finish_action` in class
{self.__class__.__name__}. "
+ "Currently the default for this parameter is `keep_pod`
but in a future release"
+ " the default will be changed to `delete_pod`. To ensure
pods are not deleted in"
+ " the future you will need to set
`on_finish_action=keep_pod` explicitly.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
if regional is not None:
warnings.warn(
@@ -472,7 +490,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
stacklevel=2,
)
- super().__init__(is_delete_operator_pod=is_delete_operator_pod,
**kwargs)
+ super().__init__(**kwargs)
self.project_id = project_id
self.location = location
self.cluster_name = cluster_name
@@ -560,8 +578,8 @@ class GKEStartPodOperator(KubernetesPodOperator):
cluster_context=self.cluster_context,
poll_interval=self.poll_interval,
in_cluster=self.in_cluster,
- should_delete_pod=self.is_delete_operator_pod,
base_container_name=self.base_container_name,
+ on_finish_action=self.on_finish_action,
),
method_name="execute_complete",
kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert":
self._ssl_ca_cert},
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index 1ec0e420f9..ba0df0fc15 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -18,11 +18,15 @@
from __future__ import annotations
import asyncio
+import warnings
from datetime import datetime
from typing import Any, AsyncIterator, Sequence
from google.cloud.container_v1.types import Operation
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
+
try:
from airflow.providers.cncf.kubernetes.triggers.pod import
KubernetesPodTrigger
except ImportError:
@@ -44,15 +48,19 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
:param poll_interval: Polling period in seconds to check for the status.
:param trigger_start_time: time in Datetime format when the trigger was
started
:param in_cluster: run kubernetes client with in_cluster configuration.
- :param should_delete_pod: What to do when the pod reaches its final
- state, or the execution is interrupted. If True (default), delete the
- pod; if False, leave the pod.
:param get_logs: get the stdout of the container as logs of the tasks.
:param startup_timeout: timeout in seconds to start up the pod.
:param base_container_name: The name of the base container in the pod.
This container's logs
will appear as part of this task's logs if get_logs is True. Defaults
to None. If None,
will consult the class variable BASE_CONTAINER_NAME (which defaults to
"base") for the base
container name to use.
+ :param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
+ If "delete_pod", the pod will be deleted regardless it's state; if
"delete_succeeded_pod",
+ only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
+ :param should_delete_pod: What to do when the pod reaches its final
+ state, or the execution is interrupted. If True (default), delete the
+ pod; if False, leave the pod.
+ Deprecated - use `on_finish_action` instead.
"""
def __init__(
@@ -66,9 +74,10 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
cluster_context: str | None = None,
poll_interval: float = 2,
in_cluster: bool | None = None,
- should_delete_pod: bool = True,
get_logs: bool = True,
startup_timeout: int = 120,
+ on_finish_action: str = "delete_pod",
+ should_delete_pod: bool | None = None,
*args,
**kwargs,
):
@@ -87,10 +96,22 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
self.poll_interval = poll_interval
self.cluster_context = cluster_context
self.in_cluster = in_cluster
- self.should_delete_pod = should_delete_pod
self.get_logs = get_logs
self.startup_timeout = startup_timeout
+ if should_delete_pod is not None:
+ warnings.warn(
+ "`should_delete_pod` parameter is deprecated, please use
`on_finish_action`",
+ AirflowProviderDeprecationWarning,
+ )
+ self.on_finish_action = (
+ OnFinishAction.DELETE_POD if should_delete_pod else
OnFinishAction.KEEP_POD
+ )
+ self.should_delete_pod = should_delete_pod
+ else:
+ self.on_finish_action = OnFinishAction(on_finish_action)
+ self.should_delete_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+
self._cluster_url = cluster_url
self._ssl_ca_cert = ssl_ca_cert
@@ -105,11 +126,12 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
"poll_interval": self.poll_interval,
"cluster_context": self.cluster_context,
"in_cluster": self.in_cluster,
- "should_delete_pod": self.should_delete_pod,
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
"trigger_start_time": self.trigger_start_time,
"base_container_name": self.base_container_name,
+ "should_delete_pod": self.should_delete_pod,
+ "on_finish_action": self.on_finish_action.value,
},
)
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 3fc2c90b9b..f22ff42082 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -1130,7 +1130,7 @@ additional-extras:
- apache-beam[gcp]
- name: cncf.kubernetes
dependencies:
- - apache-airflow-providers-cncf-kubernetes>=6.2.0
+ - apache-airflow-providers-cncf-kubernetes>=7.2.0
- name: leveldb
dependencies:
- plyvel
diff --git a/tests/providers/amazon/aws/operators/test_eks.py
b/tests/providers/amazon/aws/operators/test_eks.py
index 5534f635d8..9ea8ab72d7 100644
--- a/tests/providers/amazon/aws/operators/test_eks.py
+++ b/tests/providers/amazon/aws/operators/test_eks.py
@@ -39,6 +39,7 @@ from airflow.providers.amazon.aws.triggers.eks import (
EksDeleteFargateProfileTrigger,
EksNodegroupTrigger,
)
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.typing_compat import TypedDict
from tests.providers.amazon.aws.utils.eks_test_constants import (
NODEROLE_ARN,
@@ -641,7 +642,7 @@ class TestEksPodOperator:
labels={"demo": "hello_world"},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution
is interrupted.
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
)
op_return_value = op.execute(ti_context)
mock_k8s_pod_operator_execute.assert_called_once_with(ti_context)
@@ -651,3 +652,58 @@ class TestEksPodOperator:
)
assert mock_k8s_pod_operator_execute.return_value == op_return_value
assert mock_generate_config_file.return_value.__enter__.return_value
== op.config_file
+
+ @pytest.mark.parametrize(
+ "compatible_kpo, kwargs, expected_attributes",
+ [
+ (
+ True,
+ {"on_finish_action": "delete_succeeded_pod"},
+ {"on_finish_action": OnFinishAction.DELETE_SUCCEEDED_POD},
+ ),
+ (
+ # test that priority for deprecated param
+ True,
+ {"on_finish_action": "keep_pod", "is_delete_operator_pod":
True},
+ {"on_finish_action": OnFinishAction.DELETE_POD,
"is_delete_operator_pod": True},
+ ),
+ (
+ # test default
+ True,
+ {},
+ {"on_finish_action": OnFinishAction.KEEP_POD,
"is_delete_operator_pod": False},
+ ),
+ (
+ False,
+ {"is_delete_operator_pod": True},
+ {"is_delete_operator_pod": True},
+ ),
+ (
+ False,
+ {"is_delete_operator_pod": False},
+ {"is_delete_operator_pod": False},
+ ),
+ (
+ # test default
+ False,
+ {},
+ {"is_delete_operator_pod": False},
+ ),
+ ],
+ )
+ def test_on_finish_action_handler(self, compatible_kpo, kwargs,
expected_attributes):
+ kpo_init_args_mock = mock.MagicMock(**{"parameters":
["on_finish_action"] if compatible_kpo else []})
+
+ with mock.patch("inspect.signature", return_value=kpo_init_args_mock):
+ op = EksPodOperator(
+ task_id="run_pod",
+ pod_name="run_pod",
+ cluster_name=CLUSTER_NAME,
+ image="amazon/aws-cli:latest",
+ cmds=["sh", "-c", "ls"],
+ labels={"demo": "hello_world"},
+ get_logs=True,
+ **kwargs,
+ )
+ for expected_attr in expected_attributes:
+ assert op.__getattribute__(expected_attr) ==
expected_attributes[expected_attr]
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 40feb30789..1ce25190ba 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -589,9 +589,22 @@ class TestKubernetesPodOperator:
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].image_pull_policy == "Always"
+ @pytest.mark.parametrize(
+ "task_kwargs, should_be_deleted",
+ [
+ ({}, True), # default values
+ ({"is_delete_operator_pod": True}, True), # check b/c of
is_delete_operator_pod
+ ({"is_delete_operator_pod": False}, False), # check b/c of
is_delete_operator_pod
+ ({"on_finish_action": "delete_pod"}, True),
+ ({"on_finish_action": "delete_succeeded_pod"}, False),
+ ({"on_finish_action": "keep_pod"}, False),
+ ],
+ )
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
- def test_pod_delete_after_await_container_error(self, find_pod_mock,
delete_pod_mock):
+ def test_pod_delete_after_await_container_error(
+ self, find_pod_mock, delete_pod_mock, task_kwargs, should_be_deleted
+ ):
"""
When KPO fails unexpectedly during await_container, we should still
try to delete the pod,
and the pod we try to delete should be the one returned from find_pod
earlier.
@@ -600,13 +613,16 @@ class TestKubernetesPodOperator:
cont_status.name = "base"
cont_status.state.terminated.message = "my-failure"
find_pod_mock.return_value.status.container_statuses = [cont_status]
- k = KubernetesPodOperator(task_id="task")
+ k = KubernetesPodOperator(task_id="task", **task_kwargs)
self.await_pod_mock.side_effect = AirflowException("fake failure")
with pytest.raises(AirflowException, match="my-failure"):
context = create_context(k)
context["ti"].xcom_push = MagicMock()
k.execute(context=context)
- delete_pod_mock.assert_called_with(find_pod_mock.return_value)
+ if should_be_deleted:
+ delete_pod_mock.assert_called_with(find_pod_mock.return_value)
+ else:
+ delete_pod_mock.assert_not_called()
@pytest.mark.parametrize("should_fail", [True, False])
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
@@ -618,7 +634,7 @@ class TestKubernetesPodOperator:
"""
k = KubernetesPodOperator(
task_id="task",
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
)
if should_fail:
@@ -1019,7 +1035,7 @@ class TestKubernetesPodOperator:
"""If we aren't deleting pods and have an exception, mark it so we
don't reattach to it"""
k = KubernetesPodOperator(
task_id="task",
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
)
self.await_pod_mock.side_effect = AirflowException("oops")
context = create_context(k)
@@ -1045,16 +1061,50 @@ class TestKubernetesPodOperator:
else:
mock_await.assert_not_called()
- @pytest.mark.parametrize("should_fail", [True, False])
+ @pytest.mark.parametrize(
+ "task_kwargs, should_fail, should_be_deleted",
+ [
+ ({}, False, True),
+ ({}, True, True),
+ (
+ {"is_delete_operator_pod": True, "on_finish_action":
"keep_pod"},
+ False,
+ True,
+ ), # check backcompat of is_delete_operator_pod
+ (
+ {"is_delete_operator_pod": True, "on_finish_action":
"keep_pod"},
+ True,
+ True,
+ ), # check b/c of is_delete_operator_pod
+ (
+ {"is_delete_operator_pod": False, "on_finish_action":
"delete_pod"},
+ False,
+ False,
+ ), # check b/c of is_delete_operator_pod
+ (
+ {"is_delete_operator_pod": False, "on_finish_action":
"delete_pod"},
+ True,
+ False,
+ ), # check b/c of is_delete_operator_pod
+ ({"on_finish_action": "keep_pod"}, False, False),
+ ({"on_finish_action": "keep_pod"}, True, False),
+ ({"on_finish_action": "delete_pod"}, False, True),
+ ({"on_finish_action": "delete_pod"}, True, True),
+ ({"on_finish_action": "delete_succeeded_pod"}, False, True),
+ ({"on_finish_action": "delete_succeeded_pod"}, True, False),
+ ],
+ )
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
- def test_mark_checked_if_not_deleted(self, mock_patch_already_checked,
mock_delete_pod, should_fail):
+ def test_mark_checked_if_not_deleted(
+ self, mock_patch_already_checked, mock_delete_pod, task_kwargs,
should_fail, should_be_deleted
+ ):
"""If we aren't deleting pods mark "checked" if the task completes
(successful or otherwise)"""
dag = DAG("hello2", start_date=pendulum.now())
k = KubernetesPodOperator(
task_id="task",
- is_delete_operator_pod=False,
dag=dag,
+ **task_kwargs,
)
remote_pod_mock = MagicMock()
remote_pod_mock.status.phase = "Failed" if should_fail else "Succeeded"
@@ -1065,8 +1115,14 @@ class TestKubernetesPodOperator:
k.execute(context=context)
else:
k.execute(context=context)
- mock_patch_already_checked.assert_called_once()
- mock_delete_pod.assert_not_called()
+ if should_fail or not should_be_deleted:
+ mock_patch_already_checked.assert_called_once()
+ else:
+ mock_patch_already_checked.assert_not_called()
+ if should_be_deleted:
+ mock_delete_pod.assert_called_once()
+ else:
+ mock_delete_pod.assert_not_called()
@patch(HOOK_CLASS, new=MagicMock)
def test_patch_already_checked(self):
@@ -1141,7 +1197,7 @@ class TestKubernetesPodOperator:
):
"""Tests that an AirflowSkipException is raised when the container
exits with the skip_on_exit_code"""
k = KubernetesPodOperator(
- task_id="task", is_delete_operator_pod=True, **(extra_kwargs if
extra_kwargs else {})
+ task_id="task", on_finish_action="delete_pod", **(extra_kwargs if
extra_kwargs else {})
)
base_container = MagicMock()
@@ -1282,7 +1338,7 @@ class TestKubernetesPodOperatorAsync:
arguments=TEST_ARGS,
labels=TEST_LABELS,
name=TEST_NAME,
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
@@ -1306,7 +1362,7 @@ class TestKubernetesPodOperatorAsync:
arguments=TEST_ARGS,
labels=TEST_LABELS,
name=TEST_NAME,
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
@@ -1353,7 +1409,7 @@ class TestKubernetesPodOperatorAsync:
arguments=TEST_ARGS,
labels=TEST_LABELS,
name=TEST_NAME,
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 6d5d18d028..4ed731b425 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -39,12 +39,12 @@ POLL_INTERVAL = 2
CLUSTER_CONTEXT = "test-context"
CONFIG_FILE = "/path/to/config/file"
IN_CLUSTER = False
-SHOULD_DELETE_POD = True
GET_LOGS = True
STARTUP_TIMEOUT_SECS = 120
TRIGGER_START_TIME = datetime.now(tz=pytz.UTC)
FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
BASE_CONTAINER_NAME = "base"
+ON_FINISH_ACTION = "delete_pod"
@pytest.fixture
@@ -58,10 +58,10 @@ def trigger():
cluster_context=CLUSTER_CONTEXT,
config_file=CONFIG_FILE,
in_cluster=IN_CLUSTER,
- should_delete_pod=SHOULD_DELETE_POD,
get_logs=GET_LOGS,
startup_timeout=STARTUP_TIMEOUT_SECS,
trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action=ON_FINISH_ACTION,
)
@@ -85,10 +85,11 @@ class TestKubernetesPodTrigger:
"cluster_context": CLUSTER_CONTEXT,
"config_file": CONFIG_FILE,
"in_cluster": IN_CLUSTER,
- "should_delete_pod": SHOULD_DELETE_POD,
"get_logs": GET_LOGS,
"startup_timeout": STARTUP_TIMEOUT_SECS,
"trigger_start_time": TRIGGER_START_TIME,
+ "on_finish_action": ON_FINISH_ACTION,
+ "should_delete_pod": ON_FINISH_ACTION == "delete_pod",
}
@pytest.mark.asyncio
@@ -237,8 +238,10 @@ class TestKubernetesPodTrigger:
@pytest.mark.asyncio
@mock.patch(f"{TRIGGER_PATH}._get_async_hook")
- async def
test_logging_in_trigger_when_cancelled_should_execute_successfully(
- self, mock_hook, trigger, caplog
+ async def
test_logging_in_trigger_when_cancelled_should_execute_successfully_and_delete_pod(
+ self,
+ mock_hook,
+ caplog,
):
"""
Test that KubernetesPodTrigger fires the correct event in case if the
task was cancelled.
@@ -248,6 +251,21 @@ class TestKubernetesPodTrigger:
mock_hook.return_value.read_logs.return_value =
self._mock_pod_result(mock.MagicMock())
mock_hook.return_value.delete_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+ trigger = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_file=CONFIG_FILE,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action="delete_pod",
+ )
+
generator = trigger.run()
actual = await generator.asend(None)
assert (
@@ -264,6 +282,52 @@ class TestKubernetesPodTrigger:
assert "Outputting container logs..." in caplog.text
assert "Deleting pod..." in caplog.text
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
+ async def
test_logging_in_trigger_when_cancelled_should_execute_successfully_without_delete_pod(
+ self,
+ mock_hook,
+ caplog,
+ ):
+ """
+ Test that KubernetesPodTrigger fires the correct event if the task was
cancelled.
+ """
+
+ mock_hook.return_value.get_pod.side_effect = CancelledError()
+ mock_hook.return_value.read_logs.return_value =
self._mock_pod_result(mock.MagicMock())
+ mock_hook.return_value.delete_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+
+ trigger = KubernetesPodTrigger(
+ pod_name=POD_NAME,
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ kubernetes_conn_id=CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ cluster_context=CLUSTER_CONTEXT,
+ config_file=CONFIG_FILE,
+ in_cluster=IN_CLUSTER,
+ get_logs=GET_LOGS,
+ startup_timeout=STARTUP_TIMEOUT_SECS,
+ trigger_start_time=TRIGGER_START_TIME,
+ on_finish_action="delete_succeeded_pod",
+ )
+
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert (
+ TriggerEvent(
+ {
+ "name": POD_NAME,
+ "namespace": NAMESPACE,
+ "status": "cancelled",
+ "message": "Pod execution was cancelled",
+ }
+ )
+ == actual
+ )
+ assert "Outputting container logs..." in caplog.text
+ assert "Deleting pod..." not in caplog.text
+
@pytest.mark.parametrize(
"container_state, expected_state",
[
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 5d6acb196a..07f0c34c7a 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -26,6 +26,7 @@ import pytest
from airflow.exceptions import AirflowException, TaskDeferred
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKECreateClusterOperator,
GKEDeleteClusterOperator,
@@ -319,6 +320,66 @@ class TestGKEPodOperator:
assert cluster_url == CLUSTER_PRIVATE_URL if use_internal_ip else
CLUSTER_URL
assert ssl_ca_cert == SSL_CA_CERT
+ @pytest.mark.parametrize(
+ "compatible_kpo, kwargs, expected_attributes",
+ [
+ (
+ True,
+ {"on_finish_action": "delete_succeeded_pod"},
+ {"on_finish_action": OnFinishAction.DELETE_SUCCEEDED_POD},
+ ),
+ (
+ # test that priority for deprecated param
+ True,
+ {"on_finish_action": "keep_pod", "is_delete_operator_pod":
True},
+ {"on_finish_action": OnFinishAction.DELETE_POD,
"is_delete_operator_pod": True},
+ ),
+ (
+ # test default
+ True,
+ {},
+ {"on_finish_action": OnFinishAction.KEEP_POD,
"is_delete_operator_pod": False},
+ ),
+ (
+ False,
+ {"is_delete_operator_pod": True},
+ {"is_delete_operator_pod": True},
+ ),
+ (
+ False,
+ {"is_delete_operator_pod": False},
+ {"is_delete_operator_pod": False},
+ ),
+ (
+ # test default
+ False,
+ {},
+ {"is_delete_operator_pod": False},
+ ),
+ ],
+ )
+ def test_on_finish_action_handler(
+ self,
+ compatible_kpo,
+ kwargs,
+ expected_attributes,
+ ):
+ kpo_init_args_mock = mock.MagicMock(**{"parameters":
["on_finish_action"] if compatible_kpo else []})
+
+ with mock.patch("inspect.signature", return_value=kpo_init_args_mock):
+ op = GKEStartPodOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ image=IMAGE,
+ **kwargs,
+ )
+ for expected_attr in expected_attributes:
+ assert op.__getattribute__(expected_attr) ==
expected_attributes[expected_attr]
+
class TestGKEPodOperatorAsync:
def setup_method(self):
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index e957767e3e..154908a6c4 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -48,6 +48,7 @@ CLUSTER_URL = "https://test-host"
SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
BASE_CONTAINER_NAME = "base"
+ON_FINISH_ACTION = "delete_pod"
OPERATION_NAME = "test-operation-name"
PROJECT_ID = "test-project-id"
@@ -93,13 +94,14 @@ class TestGKEStartPodTrigger:
"poll_interval": POLL_INTERVAL,
"cluster_context": CLUSTER_CONTEXT,
"in_cluster": IN_CLUSTER,
- "should_delete_pod": SHOULD_DELETE_POD,
"get_logs": GET_LOGS,
"startup_timeout": STARTUP_TIMEOUT_SECS,
"trigger_start_time": TRIGGER_START_TIME,
"cluster_url": CLUSTER_URL,
"ssl_ca_cert": SSL_CA_CERT,
"base_container_name": BASE_CONTAINER_NAME,
+ "on_finish_action": ON_FINISH_ACTION,
+ "should_delete_pod": SHOULD_DELETE_POD,
}
@pytest.mark.asyncio
diff --git a/tests/system/providers/amazon/aws/example_eks_templated.py
b/tests/system/providers/amazon/aws/example_eks_templated.py
index d09eabf959..e51783b53e 100644
--- a/tests/system/providers/amazon/aws/example_eks_templated.py
+++ b/tests/system/providers/amazon/aws/example_eks_templated.py
@@ -107,7 +107,7 @@ with DAG(
labels={"demo": "hello_world"},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is
interrupted.
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
)
delete_nodegroup = EksDeleteNodegroupOperator(
diff --git
a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
index d14e7a6885..ae67a26588 100644
--- a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
@@ -101,7 +101,7 @@ with DAG(
get_logs=True,
startup_timeout_seconds=600,
# Keep the pod alive, so we can describe it in case of trouble. It's
deleted with the cluster anyway.
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
)
describe_pod = get_describe_pod_operator(
diff --git
a/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
b/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
index 0c19fda2d5..9e19c7594f 100644
--- a/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
@@ -119,7 +119,7 @@ with DAG(
labels={"demo": "hello_world"},
get_logs=True,
# Keep the pod alive, so we can describe it in case of trouble. It's
deleted with the cluster anyway.
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
startup_timeout_seconds=200,
)
diff --git
a/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
index 76aa01bf5c..9bfce2cc16 100644
---
a/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
+++
b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
@@ -111,7 +111,7 @@ with DAG(
labels={"demo": "hello_world"},
get_logs=True,
# Keep the pod alive, so we can describe it in case of trouble. It's
deleted with the cluster anyway.
- is_delete_operator_pod=False,
+ on_finish_action="keep_pod",
)
describe_pod = get_describe_pod_operator(
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes.py
b/tests/system/providers/cncf/kubernetes/example_kubernetes.py
index 2f1f791d78..74078541bd 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes.py
@@ -122,7 +122,7 @@ with DAG(
name="airflow-test-pod",
task_id="task",
affinity=affinity,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
hostnetwork=False,
tolerations=tolerations,
init_containers=[init_container],
@@ -138,7 +138,7 @@ with DAG(
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
in_cluster=True,
task_id="task-two",
get_logs=True,
@@ -152,7 +152,7 @@ with DAG(
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' >
/airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
in_cluster=True,
task_id="write-xcom",
get_logs=True,
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
index f44e8637ff..36e143c096 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
@@ -123,7 +123,7 @@ with DAG(
env_from=configmaps,
name="airflow-test-pod",
affinity=affinity,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
hostnetwork=False,
tolerations=tolerations,
init_containers=[init_container],
@@ -141,7 +141,7 @@ with DAG(
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
@@ -156,7 +156,7 @@ with DAG(
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' >
/airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
diff --git
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
index d2a6161ed8..bf155499f2 100644
---
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
+++
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
@@ -67,7 +67,7 @@ with models.DAG(
image="perl",
name="test-pod",
in_cluster=False,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
)
# [START howto_operator_gke_start_pod_xcom]
@@ -82,7 +82,7 @@ with models.DAG(
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' >
/airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
)
# [END howto_operator_gke_start_pod_xcom]
diff --git
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
index 13c310d880..73430a3938 100644
---
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
+++
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
@@ -66,7 +66,7 @@ with models.DAG(
image="perl",
name="test-pod-async",
in_cluster=False,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
get_logs=True,
deferrable=True,
)
@@ -82,7 +82,7 @@ with models.DAG(
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' >
/airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
- is_delete_operator_pod=True,
+ on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,