This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd937e51fe Add `on_finish_action` to `KubernetesPodOperator` (#30718)
dd937e51fe is described below

commit dd937e51fe1ae3cd36a6993bd42e425960644e1d
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Jun 30 10:01:13 2023 +0200

    Add `on_finish_action` to `KubernetesPodOperator` (#30718)
    
    * Add a new arg for KPO to only delete the pod when it doesn't fail
    
    * deprecate is_delete_operator_pod and add on_finish_action
    
    * Add deprecated properties and fix unit tests
    
    * add missing attribute
    
    * Apply suggestions from code review
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * update GKEStartPodOperator to be consistent with KPO
    
    * update EksPodOperator to be consistent with KPO
    
    * update unit tests and the method used to check the kpo compatibility
    
    * Fix a bug and add a new unit test for each provider
    
    * warn with AirflowProviderDeprecationWarning instead of DeprecationWarning
    
    * Bump KPO min version in GCP provider and add a new one to AWS provider
    
    * Add the new param to the GKE trigger
    
    * Apply suggestions from code review
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow/providers/amazon/aws/operators/eks.py      | 34 ++++++---
 airflow/providers/amazon/provider.yaml             |  3 +
 airflow/providers/cncf/kubernetes/operators/pod.py | 43 ++++++++---
 airflow/providers/cncf/kubernetes/triggers/pod.py  | 34 +++++++--
 .../providers/cncf/kubernetes/utils/pod_manager.py |  9 +++
 .../google/cloud/operators/kubernetes_engine.py    | 38 +++++++---
 .../google/cloud/triggers/kubernetes_engine.py     | 34 +++++++--
 airflow/providers/google/provider.yaml             |  2 +-
 tests/providers/amazon/aws/operators/test_eks.py   | 58 ++++++++++++++-
 .../cncf/kubernetes/operators/test_pod.py          | 84 ++++++++++++++++++----
 .../providers/cncf/kubernetes/triggers/test_pod.py | 74 +++++++++++++++++--
 .../cloud/operators/test_kubernetes_engine.py      | 61 ++++++++++++++++
 .../cloud/triggers/test_kubernetes_engine.py       |  4 +-
 .../providers/amazon/aws/example_eks_templated.py  |  2 +-
 .../aws/example_eks_with_fargate_in_one_step.py    |  2 +-
 .../amazon/aws/example_eks_with_fargate_profile.py |  2 +-
 .../aws/example_eks_with_nodegroup_in_one_step.py  |  2 +-
 .../cncf/kubernetes/example_kubernetes.py          |  6 +-
 .../cncf/kubernetes/example_kubernetes_async.py    |  6 +-
 .../kubernetes_engine/example_kubernetes_engine.py |  4 +-
 .../example_kubernetes_engine_async.py             |  4 +-
 21 files changed, 430 insertions(+), 76 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/eks.py 
b/airflow/providers/amazon/aws/operators/eks.py
index 9c27e88350..bea4223987 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -34,6 +34,7 @@ from airflow.providers.amazon.aws.triggers.eks import (
     EksNodegroupTrigger,
 )
 from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 
 try:
     from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
@@ -854,10 +855,15 @@ class EksPodOperator(KubernetesPodOperator):
          running Airflow in a distributed manner and aws_conn_id is None or
          empty, then the default boto3 configuration would be used (and must be
          maintained on each worker node).
+    :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
+        If "delete_pod", the pod will be deleted regardless it's state; if 
"delete_succeeded_pod",
+        only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
+        Current default is `keep_pod`, but this will be changed in the next 
major release of this provider.
     :param is_delete_operator_pod: What to do when the pod reaches its final
         state, or the execution is interrupted. If True, delete the
-        pod; if False, leave the pod.  Current default is False, but this will 
be
+        pod; if False, leave the pod. Current default is False, but this will 
be
         changed in the next major release of this provider.
+        Deprecated - use `on_finish_action` instead.
 
     """
 
@@ -885,19 +891,32 @@ class EksPodOperator(KubernetesPodOperator):
         pod_username: str | None = None,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        on_finish_action: str | None = None,
         is_delete_operator_pod: bool | None = None,
         **kwargs,
     ) -> None:
-        if is_delete_operator_pod is None:
+        if is_delete_operator_pod is not None:
             warnings.warn(
-                f"You have not set parameter `is_delete_operator_pod` in class 
{self.__class__.__name__}. "
-                "Currently the default for this parameter is `False` but in a 
future release the default "
-                "will be changed to `True`. To ensure pods are not deleted in 
the future you will need to "
-                "set `is_delete_operator_pod=False` explicitly.",
+                "`is_delete_operator_pod` parameter is deprecated, please use 
`on_finish_action`",
                 AirflowProviderDeprecationWarning,
                 stacklevel=2,
             )
-            is_delete_operator_pod = False
+            kwargs["on_finish_action"] = (
+                OnFinishAction.DELETE_POD if is_delete_operator_pod else 
OnFinishAction.KEEP_POD
+            )
+        else:
+            if on_finish_action is not None:
+                kwargs["on_finish_action"] = OnFinishAction(on_finish_action)
+            else:
+                warnings.warn(
+                    f"You have not set parameter `on_finish_action` in class 
{self.__class__.__name__}. "
+                    "Currently the default for this parameter is `keep_pod` 
but in a future release"
+                    " the default will be changed to `delete_pod`. To ensure 
pods are not deleted in"
+                    " the future you will need to set 
`on_finish_action=keep_pod` explicitly.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=2,
+                )
+                kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
 
         self.cluster_name = cluster_name
         self.in_cluster = in_cluster
@@ -909,7 +928,6 @@ class EksPodOperator(KubernetesPodOperator):
             in_cluster=self.in_cluster,
             namespace=self.namespace,
             name=self.pod_name,
-            is_delete_operator_pod=is_delete_operator_pod,
             **kwargs,
         )
         # There is no need to manage the kube_config file, as it will be 
generated automatically.
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 0c74094e81..a7dac9fad4 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -667,3 +667,6 @@ additional-extras:
   - name: aiobotocore
     dependencies:
       - aiobotocore[boto3]>=2.2.0
+  - name: cncf.kubernetes
+    dependencies:
+      - apache-airflow-providers-cncf-kubernetes>=7.2.0
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 87bfc41301..696611c6c2 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -23,6 +23,7 @@ import logging
 import re
 import secrets
 import string
+import warnings
 from collections.abc import Container
 from contextlib import AbstractContextManager
 from functools import cached_property
@@ -32,7 +33,7 @@ from kubernetes.client import CoreV1Api, models as k8s
 from slugify import slugify
 from urllib3.exceptions import HTTPError
 
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.kubernetes import pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.secret import Secret
@@ -52,6 +53,7 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes 
import KubernetesHook
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils import xcom_sidecar  # type: 
ignore[attr-defined]
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+    OnFinishAction,
     PodLaunchFailedException,
     PodManager,
     PodOperatorHookProtocol,
@@ -188,9 +190,6 @@ class KubernetesPodOperator(BaseOperator):
         If more than one secret is required, provide a
         comma separated list: secret_a,secret_b
     :param service_account_name: Name of the service account
-    :param is_delete_operator_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
     :param hostnetwork: If True enable host networking on the pod.
     :param tolerations: A list of kubernetes tolerations.
     :param security_context: security options the pod should run with 
(PodSecurityContext).
@@ -226,6 +225,13 @@ class KubernetesPodOperator(BaseOperator):
     :param deferrable: Run operator in the deferrable mode.
     :param poll_interval: Polling period in seconds to check for the status. 
Used only in deferrable mode.
     :param log_pod_spec_on_failure: Log the pod's specification if a failure 
occurs
+    :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
+        If "delete_pod", the pod will be deleted regardless it's state; if 
"delete_succeeded_pod",
+        only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
+    :param is_delete_operator_pod: What to do when the pod reaches its final
+        state, or the execution is interrupted. If True (default), delete the
+        pod; if False, leave the pod.
+        Deprecated - use `on_finish_action` instead.
     """
 
     # This field can be overloaded at the instance level via 
base_container_name
@@ -279,7 +285,6 @@ class KubernetesPodOperator(BaseOperator):
         node_selector: dict | None = None,
         image_pull_secrets: list[k8s.V1LocalObjectReference] | None = None,
         service_account_name: str | None = None,
-        is_delete_operator_pod: bool = True,
         hostnetwork: bool = False,
         tolerations: list[k8s.V1Toleration] | None = None,
         security_context: dict | None = None,
@@ -303,6 +308,8 @@ class KubernetesPodOperator(BaseOperator):
         deferrable: bool = False,
         poll_interval: float = 2,
         log_pod_spec_on_failure: bool = True,
+        on_finish_action: str = "delete_pod",
+        is_delete_operator_pod: None | bool = None,
         **kwargs,
     ) -> None:
         # TODO: remove in provider 6.0.0 release. This is a mitigate step to 
advise users to switch to the
@@ -350,7 +357,6 @@ class KubernetesPodOperator(BaseOperator):
         self.config_file = config_file
         self.image_pull_secrets = 
convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
         self.service_account_name = service_account_name
-        self.is_delete_operator_pod = is_delete_operator_pod
         self.hostnetwork = hostnetwork
         self.tolerations = (
             [convert_toleration(toleration) for toleration in tolerations] if 
tolerations else []
@@ -384,6 +390,20 @@ class KubernetesPodOperator(BaseOperator):
         self.poll_interval = poll_interval
         self.remote_pod: k8s.V1Pod | None = None
         self.log_pod_spec_on_failure = log_pod_spec_on_failure
+        if is_delete_operator_pod is not None:
+            warnings.warn(
+                "`is_delete_operator_pod` parameter is deprecated, please use 
`on_finish_action`",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+            self.on_finish_action = (
+                OnFinishAction.DELETE_POD if is_delete_operator_pod else 
OnFinishAction.KEEP_POD
+            )
+            self.is_delete_operator_pod = is_delete_operator_pod
+        else:
+            self.on_finish_action = OnFinishAction(on_finish_action)
+            self.is_delete_operator_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
+
         self._config_dict: dict | None = None  # TODO: remove it when removing 
convert_config_file_to_dict
 
     @cached_property
@@ -595,10 +615,10 @@ class KubernetesPodOperator(BaseOperator):
                 config_file=self.config_file,
                 in_cluster=self.in_cluster,
                 poll_interval=self.poll_interval,
-                should_delete_pod=self.is_delete_operator_pod,
                 get_logs=self.get_logs,
                 startup_timeout=self.startup_timeout_seconds,
                 base_container_name=self.base_container_name,
+                on_finish_action=self.on_finish_action.value,
             ),
             method_name="execute_complete",
         )
@@ -669,7 +689,8 @@ class KubernetesPodOperator(BaseOperator):
     def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
         pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") 
else None
 
-        if pod_phase != PodPhase.SUCCEEDED or not self.is_delete_operator_pod:
+        # if the pod fails or success, but we don't want to delete it
+        if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == 
OnFinishAction.KEEP_POD:
             self.patch_already_checked(remote_pod, reraise=False)
 
         if pod_phase != PodPhase.SUCCEEDED:
@@ -722,7 +743,11 @@ class KubernetesPodOperator(BaseOperator):
     def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
         with _optionally_suppress(reraise=reraise):
             if pod is not None:
-                if self.is_delete_operator_pod:
+                should_delete_pod = (self.on_finish_action == 
OnFinishAction.DELETE_POD) or (
+                    self.on_finish_action == 
OnFinishAction.DELETE_SUCCEEDED_POD
+                    and pod.status.phase == PodPhase.SUCCEEDED
+                )
+                if should_delete_pod:
                     self.log.info("Deleting pod: %s", pod.metadata.name)
                     self.pod_manager.delete_pod(pod)
                 else:
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 6cfb6c523a..6fdf763ece 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import asyncio
+import warnings
 from asyncio import CancelledError
 from datetime import datetime
 from enum import Enum
@@ -25,8 +26,9 @@ from typing import Any, AsyncIterator
 import pytz
 from kubernetes_asyncio.client.models import V1Pod
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
AsyncKubernetesHook
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
+from airflow.providers.cncf.kubernetes.utils.pod_manager import 
OnFinishAction, PodPhase
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 
@@ -57,11 +59,15 @@ class KubernetesPodTrigger(BaseTrigger):
     :param poll_interval: Polling period in seconds to check for the status.
     :param trigger_start_time: time in Datetime format when the trigger was 
started
     :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param get_logs: get the stdout of the container as logs of the tasks.
+    :param startup_timeout: timeout in seconds to start up the pod.
+    :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
+        If "delete_pod", the pod will be deleted regardless it's state; if 
"delete_succeeded_pod",
+        only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
     :param should_delete_pod: What to do when the pod reaches its final
         state, or the execution is interrupted. If True (default), delete the
         pod; if False, leave the pod.
-    :param get_logs: get the stdout of the container as logs of the tasks.
-    :param startup_timeout: timeout in seconds to start up the pod.
+        Deprecated - use `on_finish_action` instead.
     """
 
     def __init__(
@@ -75,9 +81,10 @@ class KubernetesPodTrigger(BaseTrigger):
         cluster_context: str | None = None,
         config_file: str | None = None,
         in_cluster: bool | None = None,
-        should_delete_pod: bool = True,
         get_logs: bool = True,
         startup_timeout: int = 120,
+        on_finish_action: str = "delete_pod",
+        should_delete_pod: bool | None = None,
     ):
         super().__init__()
         self.pod_name = pod_name
@@ -89,10 +96,22 @@ class KubernetesPodTrigger(BaseTrigger):
         self.cluster_context = cluster_context
         self.config_file = config_file
         self.in_cluster = in_cluster
-        self.should_delete_pod = should_delete_pod
         self.get_logs = get_logs
         self.startup_timeout = startup_timeout
 
+        if should_delete_pod is not None:
+            warnings.warn(
+                "`should_delete_pod` parameter is deprecated, please use 
`on_finish_action`",
+                AirflowProviderDeprecationWarning,
+            )
+            self.on_finish_action = (
+                OnFinishAction.DELETE_POD if should_delete_pod else 
OnFinishAction.KEEP_POD
+            )
+            self.should_delete_pod = should_delete_pod
+        else:
+            self.on_finish_action = OnFinishAction(on_finish_action)
+            self.should_delete_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
+
         self._hook: AsyncKubernetesHook | None = None
         self._since_time = None
 
@@ -109,10 +128,11 @@ class KubernetesPodTrigger(BaseTrigger):
                 "cluster_context": self.cluster_context,
                 "config_file": self.config_file,
                 "in_cluster": self.in_cluster,
-                "should_delete_pod": self.should_delete_pod,
                 "get_logs": self.get_logs,
                 "startup_timeout": self.startup_timeout,
                 "trigger_start_time": self.trigger_start_time,
+                "should_delete_pod": self.should_delete_pod,
+                "on_finish_action": self.on_finish_action.value,
             },
         )
 
@@ -191,7 +211,7 @@ class KubernetesPodTrigger(BaseTrigger):
                         name=self.pod_name,
                         namespace=self.pod_namespace,
                     )
-                if self.should_delete_pod:
+                if self.on_finish_action == OnFinishAction.DELETE_POD:
                     self.log.info("Deleting pod...")
                     await self._get_async_hook().delete_pod(
                         name=self.pod_name,
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3bacb95f4f..71b5e17127 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -17,6 +17,7 @@
 """Launches PODs."""
 from __future__ import annotations
 
+import enum
 import json
 import logging
 import math
@@ -585,3 +586,11 @@ class PodManager(LoggingMixin):
                 if res:
                     return res
         return res
+
+
+class OnFinishAction(enum.Enum):
+    """Action to take when the pod finishes."""
+
+    KEEP_POD = "keep_pod"
+    DELETE_POD = "delete_pod"
+    DELETE_SUCCEEDED_POD = "delete_succeeded_pod"
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py 
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 55971f52b7..bf14828d87 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -27,6 +27,7 @@ from google.cloud.container_v1.types import Cluster
 from kubernetes.client.models import V1Pod
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 
 try:
     from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
@@ -45,7 +46,6 @@ from airflow.utils.timezone import utcnow
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-
 KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
 
 
@@ -427,11 +427,16 @@ class GKEStartPodOperator(KubernetesPodOperator):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param regional: The location param is region name.
+    :param deferrable: Run operator in the deferrable mode.
+    :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
+        If "delete_pod", the pod will be deleted regardless it's state; if 
"delete_succeeded_pod",
+        only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
+        Current default is `keep_pod`, but this will be changed in the next 
major release of this provider.
     :param is_delete_operator_pod: What to do when the pod reaches its final
         state, or the execution is interrupted. If True, delete the
         pod; if False, leave the pod. Current default is False, but this will 
be
         changed in the next major release of this provider.
-    :param deferrable: Run operator in the deferrable mode.
+        Deprecated - use `on_finish_action` instead.
     """
 
     template_fields: Sequence[str] = tuple(
@@ -449,19 +454,32 @@ class GKEStartPodOperator(KubernetesPodOperator):
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         regional: bool | None = None,
+        on_finish_action: str | None = None,
         is_delete_operator_pod: bool | None = None,
         **kwargs,
     ) -> None:
-        if is_delete_operator_pod is None:
+        if is_delete_operator_pod is not None:
             warnings.warn(
-                f"You have not set parameter `is_delete_operator_pod` in class 
{self.__class__.__name__}. "
-                "Currently the default for this parameter is `False` but in a 
future release the default "
-                "will be changed to `True`. To ensure pods are not deleted in 
the future you will need to "
-                "set `is_delete_operator_pod=False` explicitly.",
+                "`is_delete_operator_pod` parameter is deprecated, please use 
`on_finish_action`",
                 AirflowProviderDeprecationWarning,
                 stacklevel=2,
             )
-            is_delete_operator_pod = False
+            kwargs["on_finish_action"] = (
+                OnFinishAction.DELETE_POD if is_delete_operator_pod else 
OnFinishAction.KEEP_POD
+            )
+        else:
+            if on_finish_action is not None:
+                kwargs["on_finish_action"] = OnFinishAction(on_finish_action)
+            else:
+                warnings.warn(
+                    f"You have not set parameter `on_finish_action` in class 
{self.__class__.__name__}. "
+                    "Currently the default for this parameter is `keep_pod` 
but in a future release"
+                    " the default will be changed to `delete_pod`. To ensure 
pods are not deleted in"
+                    " the future you will need to set 
`on_finish_action=keep_pod` explicitly.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=2,
+                )
+                kwargs["on_finish_action"] = OnFinishAction.KEEP_POD
 
         if regional is not None:
             warnings.warn(
@@ -472,7 +490,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
                 stacklevel=2,
             )
 
-        super().__init__(is_delete_operator_pod=is_delete_operator_pod, 
**kwargs)
+        super().__init__(**kwargs)
         self.project_id = project_id
         self.location = location
         self.cluster_name = cluster_name
@@ -560,8 +578,8 @@ class GKEStartPodOperator(KubernetesPodOperator):
                 cluster_context=self.cluster_context,
                 poll_interval=self.poll_interval,
                 in_cluster=self.in_cluster,
-                should_delete_pod=self.is_delete_operator_pod,
                 base_container_name=self.base_container_name,
+                on_finish_action=self.on_finish_action,
             ),
             method_name="execute_complete",
             kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert": 
self._ssl_ca_cert},
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py 
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index 1ec0e420f9..ba0df0fc15 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -18,11 +18,15 @@
 from __future__ import annotations
 
 import asyncio
+import warnings
 from datetime import datetime
 from typing import Any, AsyncIterator, Sequence
 
 from google.cloud.container_v1.types import Operation
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
+
 try:
     from airflow.providers.cncf.kubernetes.triggers.pod import 
KubernetesPodTrigger
 except ImportError:
@@ -44,15 +48,19 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
     :param poll_interval: Polling period in seconds to check for the status.
     :param trigger_start_time: time in Datetime format when the trigger was 
started
     :param in_cluster: run kubernetes client with in_cluster configuration.
-    :param should_delete_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
     :param get_logs: get the stdout of the container as logs of the tasks.
     :param startup_timeout: timeout in seconds to start up the pod.
     :param base_container_name: The name of the base container in the pod. 
This container's logs
         will appear as part of this task's logs if get_logs is True. Defaults 
to None. If None,
         will consult the class variable BASE_CONTAINER_NAME (which defaults to 
"base") for the base
         container name to use.
+    :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
+        If "delete_pod", the pod will be deleted regardless it's state; if 
"delete_succeeded_pod",
+        only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
+    :param should_delete_pod: What to do when the pod reaches its final
+        state, or the execution is interrupted. If True (default), delete the
+        pod; if False, leave the pod.
+        Deprecated - use `on_finish_action` instead.
     """
 
     def __init__(
@@ -66,9 +74,10 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
         cluster_context: str | None = None,
         poll_interval: float = 2,
         in_cluster: bool | None = None,
-        should_delete_pod: bool = True,
         get_logs: bool = True,
         startup_timeout: int = 120,
+        on_finish_action: str = "delete_pod",
+        should_delete_pod: bool | None = None,
         *args,
         **kwargs,
     ):
@@ -87,10 +96,22 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
         self.poll_interval = poll_interval
         self.cluster_context = cluster_context
         self.in_cluster = in_cluster
-        self.should_delete_pod = should_delete_pod
         self.get_logs = get_logs
         self.startup_timeout = startup_timeout
 
+        if should_delete_pod is not None:
+            warnings.warn(
+                "`should_delete_pod` parameter is deprecated, please use 
`on_finish_action`",
+                AirflowProviderDeprecationWarning,
+            )
+            self.on_finish_action = (
+                OnFinishAction.DELETE_POD if should_delete_pod else 
OnFinishAction.KEEP_POD
+            )
+            self.should_delete_pod = should_delete_pod
+        else:
+            self.on_finish_action = OnFinishAction(on_finish_action)
+            self.should_delete_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
+
         self._cluster_url = cluster_url
         self._ssl_ca_cert = ssl_ca_cert
 
@@ -105,11 +126,12 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
                 "poll_interval": self.poll_interval,
                 "cluster_context": self.cluster_context,
                 "in_cluster": self.in_cluster,
-                "should_delete_pod": self.should_delete_pod,
                 "get_logs": self.get_logs,
                 "startup_timeout": self.startup_timeout,
                 "trigger_start_time": self.trigger_start_time,
                 "base_container_name": self.base_container_name,
+                "should_delete_pod": self.should_delete_pod,
+                "on_finish_action": self.on_finish_action.value,
             },
         )
 
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 3fc2c90b9b..f22ff42082 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -1130,7 +1130,7 @@ additional-extras:
       - apache-beam[gcp]
   - name: cncf.kubernetes
     dependencies:
-      - apache-airflow-providers-cncf-kubernetes>=6.2.0
+      - apache-airflow-providers-cncf-kubernetes>=7.2.0
   - name: leveldb
     dependencies:
       - plyvel
diff --git a/tests/providers/amazon/aws/operators/test_eks.py 
b/tests/providers/amazon/aws/operators/test_eks.py
index 5534f635d8..9ea8ab72d7 100644
--- a/tests/providers/amazon/aws/operators/test_eks.py
+++ b/tests/providers/amazon/aws/operators/test_eks.py
@@ -39,6 +39,7 @@ from airflow.providers.amazon.aws.triggers.eks import (
     EksDeleteFargateProfileTrigger,
     EksNodegroupTrigger,
 )
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 from airflow.typing_compat import TypedDict
 from tests.providers.amazon.aws.utils.eks_test_constants import (
     NODEROLE_ARN,
@@ -641,7 +642,7 @@ class TestEksPodOperator:
             labels={"demo": "hello_world"},
             get_logs=True,
             # Delete the pod when it reaches its final state, or the execution 
is interrupted.
-            is_delete_operator_pod=True,
+            on_finish_action="delete_pod",
         )
         op_return_value = op.execute(ti_context)
         mock_k8s_pod_operator_execute.assert_called_once_with(ti_context)
@@ -651,3 +652,58 @@ class TestEksPodOperator:
         )
         assert mock_k8s_pod_operator_execute.return_value == op_return_value
         assert mock_generate_config_file.return_value.__enter__.return_value 
== op.config_file
+
+    @pytest.mark.parametrize(
+        "compatible_kpo, kwargs, expected_attributes",
+        [
+            (
+                True,
+                {"on_finish_action": "delete_succeeded_pod"},
+                {"on_finish_action": OnFinishAction.DELETE_SUCCEEDED_POD},
+            ),
+            (
+                # test that priority for deprecated param
+                True,
+                {"on_finish_action": "keep_pod", "is_delete_operator_pod": 
True},
+                {"on_finish_action": OnFinishAction.DELETE_POD, 
"is_delete_operator_pod": True},
+            ),
+            (
+                # test default
+                True,
+                {},
+                {"on_finish_action": OnFinishAction.KEEP_POD, 
"is_delete_operator_pod": False},
+            ),
+            (
+                False,
+                {"is_delete_operator_pod": True},
+                {"is_delete_operator_pod": True},
+            ),
+            (
+                False,
+                {"is_delete_operator_pod": False},
+                {"is_delete_operator_pod": False},
+            ),
+            (
+                # test default
+                False,
+                {},
+                {"is_delete_operator_pod": False},
+            ),
+        ],
+    )
+    def test_on_finish_action_handler(self, compatible_kpo, kwargs, 
expected_attributes):
+        kpo_init_args_mock = mock.MagicMock(**{"parameters": 
["on_finish_action"] if compatible_kpo else []})
+
+        with mock.patch("inspect.signature", return_value=kpo_init_args_mock):
+            op = EksPodOperator(
+                task_id="run_pod",
+                pod_name="run_pod",
+                cluster_name=CLUSTER_NAME,
+                image="amazon/aws-cli:latest",
+                cmds=["sh", "-c", "ls"],
+                labels={"demo": "hello_world"},
+                get_logs=True,
+                **kwargs,
+            )
+            for expected_attr in expected_attributes:
+                assert op.__getattribute__(expected_attr) == 
expected_attributes[expected_attr]
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 40feb30789..1ce25190ba 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -589,9 +589,22 @@ class TestKubernetesPodOperator:
         pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.containers[0].image_pull_policy == "Always"
 
+    @pytest.mark.parametrize(
+        "task_kwargs, should_be_deleted",
+        [
+            ({}, True),  # default values
+            ({"is_delete_operator_pod": True}, True),  # check b/c of 
is_delete_operator_pod
+            ({"is_delete_operator_pod": False}, False),  # check b/c of 
is_delete_operator_pod
+            ({"on_finish_action": "delete_pod"}, True),
+            ({"on_finish_action": "delete_succeeded_pod"}, False),
+            ({"on_finish_action": "keep_pod"}, False),
+        ],
+    )
     @patch(f"{POD_MANAGER_CLASS}.delete_pod")
     @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
-    def test_pod_delete_after_await_container_error(self, find_pod_mock, 
delete_pod_mock):
+    def test_pod_delete_after_await_container_error(
+        self, find_pod_mock, delete_pod_mock, task_kwargs, should_be_deleted
+    ):
         """
         When KPO fails unexpectedly during await_container, we should still 
try to delete the pod,
         and the pod we try to delete should be the one returned from find_pod 
earlier.
@@ -600,13 +613,16 @@ class TestKubernetesPodOperator:
         cont_status.name = "base"
         cont_status.state.terminated.message = "my-failure"
         find_pod_mock.return_value.status.container_statuses = [cont_status]
-        k = KubernetesPodOperator(task_id="task")
+        k = KubernetesPodOperator(task_id="task", **task_kwargs)
         self.await_pod_mock.side_effect = AirflowException("fake failure")
         with pytest.raises(AirflowException, match="my-failure"):
             context = create_context(k)
             context["ti"].xcom_push = MagicMock()
             k.execute(context=context)
-        delete_pod_mock.assert_called_with(find_pod_mock.return_value)
+        if should_be_deleted:
+            delete_pod_mock.assert_called_with(find_pod_mock.return_value)
+        else:
+            delete_pod_mock.assert_not_called()
 
     @pytest.mark.parametrize("should_fail", [True, False])
     @patch(f"{POD_MANAGER_CLASS}.delete_pod")
@@ -618,7 +634,7 @@ class TestKubernetesPodOperator:
         """
         k = KubernetesPodOperator(
             task_id="task",
-            is_delete_operator_pod=True,
+            on_finish_action="delete_pod",
         )
 
         if should_fail:
@@ -1019,7 +1035,7 @@ class TestKubernetesPodOperator:
         """If we aren't deleting pods and have an exception, mark it so we 
don't reattach to it"""
         k = KubernetesPodOperator(
             task_id="task",
-            is_delete_operator_pod=False,
+            on_finish_action="keep_pod",
         )
         self.await_pod_mock.side_effect = AirflowException("oops")
         context = create_context(k)
@@ -1045,16 +1061,50 @@ class TestKubernetesPodOperator:
         else:
             mock_await.assert_not_called()
 
-    @pytest.mark.parametrize("should_fail", [True, False])
+    @pytest.mark.parametrize(
+        "task_kwargs, should_fail, should_be_deleted",
+        [
+            ({}, False, True),
+            ({}, True, True),
+            (
+                {"is_delete_operator_pod": True, "on_finish_action": 
"keep_pod"},
+                False,
+                True,
+            ),  # check backcompat of is_delete_operator_pod
+            (
+                {"is_delete_operator_pod": True, "on_finish_action": 
"keep_pod"},
+                True,
+                True,
+            ),  # check b/c of is_delete_operator_pod
+            (
+                {"is_delete_operator_pod": False, "on_finish_action": 
"delete_pod"},
+                False,
+                False,
+            ),  # check b/c of is_delete_operator_pod
+            (
+                {"is_delete_operator_pod": False, "on_finish_action": 
"delete_pod"},
+                True,
+                False,
+            ),  # check b/c of is_delete_operator_pod
+            ({"on_finish_action": "keep_pod"}, False, False),
+            ({"on_finish_action": "keep_pod"}, True, False),
+            ({"on_finish_action": "delete_pod"}, False, True),
+            ({"on_finish_action": "delete_pod"}, True, True),
+            ({"on_finish_action": "delete_succeeded_pod"}, False, True),
+            ({"on_finish_action": "delete_succeeded_pod"}, True, False),
+        ],
+    )
     @patch(f"{POD_MANAGER_CLASS}.delete_pod")
     @patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
-    def test_mark_checked_if_not_deleted(self, mock_patch_already_checked, 
mock_delete_pod, should_fail):
+    def test_mark_checked_if_not_deleted(
+        self, mock_patch_already_checked, mock_delete_pod, task_kwargs, 
should_fail, should_be_deleted
+    ):
         """If we aren't deleting pods mark "checked" if the task completes 
(successful or otherwise)"""
         dag = DAG("hello2", start_date=pendulum.now())
         k = KubernetesPodOperator(
             task_id="task",
-            is_delete_operator_pod=False,
             dag=dag,
+            **task_kwargs,
         )
         remote_pod_mock = MagicMock()
         remote_pod_mock.status.phase = "Failed" if should_fail else "Succeeded"
@@ -1065,8 +1115,14 @@ class TestKubernetesPodOperator:
                 k.execute(context=context)
         else:
             k.execute(context=context)
-        mock_patch_already_checked.assert_called_once()
-        mock_delete_pod.assert_not_called()
+        if should_fail or not should_be_deleted:
+            mock_patch_already_checked.assert_called_once()
+        else:
+            mock_patch_already_checked.assert_not_called()
+        if should_be_deleted:
+            mock_delete_pod.assert_called_once()
+        else:
+            mock_delete_pod.assert_not_called()
 
     @patch(HOOK_CLASS, new=MagicMock)
     def test_patch_already_checked(self):
@@ -1141,7 +1197,7 @@ class TestKubernetesPodOperator:
     ):
         """Tests that an AirflowSkipException is raised when the container 
exits with the skip_on_exit_code"""
         k = KubernetesPodOperator(
-            task_id="task", is_delete_operator_pod=True, **(extra_kwargs if 
extra_kwargs else {})
+            task_id="task", on_finish_action="delete_pod", **(extra_kwargs if 
extra_kwargs else {})
         )
 
         base_container = MagicMock()
@@ -1282,7 +1338,7 @@ class TestKubernetesPodOperatorAsync:
             arguments=TEST_ARGS,
             labels=TEST_LABELS,
             name=TEST_NAME,
-            is_delete_operator_pod=False,
+            on_finish_action="keep_pod",
             in_cluster=True,
             get_logs=True,
             deferrable=True,
@@ -1306,7 +1362,7 @@ class TestKubernetesPodOperatorAsync:
             arguments=TEST_ARGS,
             labels=TEST_LABELS,
             name=TEST_NAME,
-            is_delete_operator_pod=False,
+            on_finish_action="keep_pod",
             in_cluster=True,
             get_logs=True,
             deferrable=True,
@@ -1353,7 +1409,7 @@ class TestKubernetesPodOperatorAsync:
             arguments=TEST_ARGS,
             labels=TEST_LABELS,
             name=TEST_NAME,
-            is_delete_operator_pod=False,
+            on_finish_action="keep_pod",
             in_cluster=True,
             get_logs=True,
             deferrable=True,
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py 
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 6d5d18d028..4ed731b425 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -39,12 +39,12 @@ POLL_INTERVAL = 2
 CLUSTER_CONTEXT = "test-context"
 CONFIG_FILE = "/path/to/config/file"
 IN_CLUSTER = False
-SHOULD_DELETE_POD = True
 GET_LOGS = True
 STARTUP_TIMEOUT_SECS = 120
 TRIGGER_START_TIME = datetime.now(tz=pytz.UTC)
 FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
 BASE_CONTAINER_NAME = "base"
+ON_FINISH_ACTION = "delete_pod"
 
 
 @pytest.fixture
@@ -58,10 +58,10 @@ def trigger():
         cluster_context=CLUSTER_CONTEXT,
         config_file=CONFIG_FILE,
         in_cluster=IN_CLUSTER,
-        should_delete_pod=SHOULD_DELETE_POD,
         get_logs=GET_LOGS,
         startup_timeout=STARTUP_TIMEOUT_SECS,
         trigger_start_time=TRIGGER_START_TIME,
+        on_finish_action=ON_FINISH_ACTION,
     )
 
 
@@ -85,10 +85,11 @@ class TestKubernetesPodTrigger:
             "cluster_context": CLUSTER_CONTEXT,
             "config_file": CONFIG_FILE,
             "in_cluster": IN_CLUSTER,
-            "should_delete_pod": SHOULD_DELETE_POD,
             "get_logs": GET_LOGS,
             "startup_timeout": STARTUP_TIMEOUT_SECS,
             "trigger_start_time": TRIGGER_START_TIME,
+            "on_finish_action": ON_FINISH_ACTION,
+            "should_delete_pod": ON_FINISH_ACTION == "delete_pod",
         }
 
     @pytest.mark.asyncio
@@ -237,8 +238,10 @@ class TestKubernetesPodTrigger:
 
     @pytest.mark.asyncio
     @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
-    async def 
test_logging_in_trigger_when_cancelled_should_execute_successfully(
-        self, mock_hook, trigger, caplog
+    async def 
test_logging_in_trigger_when_cancelled_should_execute_successfully_and_delete_pod(
+        self,
+        mock_hook,
+        caplog,
     ):
         """
         Test that KubernetesPodTrigger fires the correct event in case if the 
task was cancelled.
@@ -248,6 +251,21 @@ class TestKubernetesPodTrigger:
         mock_hook.return_value.read_logs.return_value = 
self._mock_pod_result(mock.MagicMock())
         mock_hook.return_value.delete_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
 
+        trigger = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_file=CONFIG_FILE,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action="delete_pod",
+        )
+
         generator = trigger.run()
         actual = await generator.asend(None)
         assert (
@@ -264,6 +282,52 @@ class TestKubernetesPodTrigger:
         assert "Outputting container logs..." in caplog.text
         assert "Deleting pod..." in caplog.text
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._get_async_hook")
+    async def 
test_logging_in_trigger_when_cancelled_should_execute_successfully_without_delete_pod(
+        self,
+        mock_hook,
+        caplog,
+    ):
+        """
+        Test that KubernetesPodTrigger fires the correct event if the task was 
cancelled.
+        """
+
+        mock_hook.return_value.get_pod.side_effect = CancelledError()
+        mock_hook.return_value.read_logs.return_value = 
self._mock_pod_result(mock.MagicMock())
+        mock_hook.return_value.delete_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+
+        trigger = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_file=CONFIG_FILE,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action="delete_succeeded_pod",
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert (
+            TriggerEvent(
+                {
+                    "name": POD_NAME,
+                    "namespace": NAMESPACE,
+                    "status": "cancelled",
+                    "message": "Pod execution was cancelled",
+                }
+            )
+            == actual
+        )
+        assert "Outputting container logs..." in caplog.text
+        assert "Deleting pod..." not in caplog.text
+
     @pytest.mark.parametrize(
         "container_state, expected_state",
         [
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py 
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 5d6acb196a..07f0c34c7a 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -26,6 +26,7 @@ import pytest
 from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.models import Connection
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
+from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 from airflow.providers.google.cloud.operators.kubernetes_engine import (
     GKECreateClusterOperator,
     GKEDeleteClusterOperator,
@@ -319,6 +320,66 @@ class TestGKEPodOperator:
         assert cluster_url == CLUSTER_PRIVATE_URL if use_internal_ip else 
CLUSTER_URL
         assert ssl_ca_cert == SSL_CA_CERT
 
+    @pytest.mark.parametrize(
+        "compatible_kpo, kwargs, expected_attributes",
+        [
+            (
+                True,
+                {"on_finish_action": "delete_succeeded_pod"},
+                {"on_finish_action": OnFinishAction.DELETE_SUCCEEDED_POD},
+            ),
+            (
+                # test that priority for deprecated param
+                True,
+                {"on_finish_action": "keep_pod", "is_delete_operator_pod": 
True},
+                {"on_finish_action": OnFinishAction.DELETE_POD, 
"is_delete_operator_pod": True},
+            ),
+            (
+                # test default
+                True,
+                {},
+                {"on_finish_action": OnFinishAction.KEEP_POD, 
"is_delete_operator_pod": False},
+            ),
+            (
+                False,
+                {"is_delete_operator_pod": True},
+                {"is_delete_operator_pod": True},
+            ),
+            (
+                False,
+                {"is_delete_operator_pod": False},
+                {"is_delete_operator_pod": False},
+            ),
+            (
+                # test default
+                False,
+                {},
+                {"is_delete_operator_pod": False},
+            ),
+        ],
+    )
+    def test_on_finish_action_handler(
+        self,
+        compatible_kpo,
+        kwargs,
+        expected_attributes,
+    ):
+        kpo_init_args_mock = mock.MagicMock(**{"parameters": 
["on_finish_action"] if compatible_kpo else []})
+
+        with mock.patch("inspect.signature", return_value=kpo_init_args_mock):
+            op = GKEStartPodOperator(
+                project_id=TEST_GCP_PROJECT_ID,
+                location=PROJECT_LOCATION,
+                cluster_name=CLUSTER_NAME,
+                task_id=PROJECT_TASK_ID,
+                name=TASK_NAME,
+                namespace=NAMESPACE,
+                image=IMAGE,
+                **kwargs,
+            )
+            for expected_attr in expected_attributes:
+                assert op.__getattribute__(expected_attr) == 
expected_attributes[expected_attr]
+
 
 class TestGKEPodOperatorAsync:
     def setup_method(self):
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index e957767e3e..154908a6c4 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -48,6 +48,7 @@ CLUSTER_URL = "https://test-host";
 SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
 FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
 BASE_CONTAINER_NAME = "base"
+ON_FINISH_ACTION = "delete_pod"
 
 OPERATION_NAME = "test-operation-name"
 PROJECT_ID = "test-project-id"
@@ -93,13 +94,14 @@ class TestGKEStartPodTrigger:
             "poll_interval": POLL_INTERVAL,
             "cluster_context": CLUSTER_CONTEXT,
             "in_cluster": IN_CLUSTER,
-            "should_delete_pod": SHOULD_DELETE_POD,
             "get_logs": GET_LOGS,
             "startup_timeout": STARTUP_TIMEOUT_SECS,
             "trigger_start_time": TRIGGER_START_TIME,
             "cluster_url": CLUSTER_URL,
             "ssl_ca_cert": SSL_CA_CERT,
             "base_container_name": BASE_CONTAINER_NAME,
+            "on_finish_action": ON_FINISH_ACTION,
+            "should_delete_pod": SHOULD_DELETE_POD,
         }
 
     @pytest.mark.asyncio
diff --git a/tests/system/providers/amazon/aws/example_eks_templated.py 
b/tests/system/providers/amazon/aws/example_eks_templated.py
index d09eabf959..e51783b53e 100644
--- a/tests/system/providers/amazon/aws/example_eks_templated.py
+++ b/tests/system/providers/amazon/aws/example_eks_templated.py
@@ -107,7 +107,7 @@ with DAG(
         labels={"demo": "hello_world"},
         get_logs=True,
         # Delete the pod when it reaches its final state, or the execution is 
interrupted.
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
     )
 
     delete_nodegroup = EksDeleteNodegroupOperator(
diff --git 
a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py 
b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
index d14e7a6885..ae67a26588 100644
--- a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
@@ -101,7 +101,7 @@ with DAG(
         get_logs=True,
         startup_timeout_seconds=600,
         # Keep the pod alive, so we can describe it in case of trouble. It's 
deleted with the cluster anyway.
-        is_delete_operator_pod=False,
+        on_finish_action="keep_pod",
     )
 
     describe_pod = get_describe_pod_operator(
diff --git 
a/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py 
b/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
index 0c19fda2d5..9e19c7594f 100644
--- a/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_profile.py
@@ -119,7 +119,7 @@ with DAG(
         labels={"demo": "hello_world"},
         get_logs=True,
         # Keep the pod alive, so we can describe it in case of trouble. It's 
deleted with the cluster anyway.
-        is_delete_operator_pod=False,
+        on_finish_action="keep_pod",
         startup_timeout_seconds=200,
     )
 
diff --git 
a/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py 
b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
index 76aa01bf5c..9bfce2cc16 100644
--- 
a/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
+++ 
b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
@@ -111,7 +111,7 @@ with DAG(
         labels={"demo": "hello_world"},
         get_logs=True,
         # Keep the pod alive, so we can describe it in case of trouble. It's 
deleted with the cluster anyway.
-        is_delete_operator_pod=False,
+        on_finish_action="keep_pod",
     )
 
     describe_pod = get_describe_pod_operator(
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes.py 
b/tests/system/providers/cncf/kubernetes/example_kubernetes.py
index 2f1f791d78..74078541bd 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes.py
@@ -122,7 +122,7 @@ with DAG(
         name="airflow-test-pod",
         task_id="task",
         affinity=affinity,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         hostnetwork=False,
         tolerations=tolerations,
         init_containers=[init_container],
@@ -138,7 +138,7 @@ with DAG(
         arguments=["echo", "10", "echo pwd"],
         labels={"foo": "bar"},
         name="airflow-private-image-pod",
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         in_cluster=True,
         task_id="task-two",
         get_logs=True,
@@ -152,7 +152,7 @@ with DAG(
         cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > 
/airflow/xcom/return.json"],
         name="write-xcom",
         do_xcom_push=True,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         in_cluster=True,
         task_id="write-xcom",
         get_logs=True,
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py 
b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
index f44e8637ff..36e143c096 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py
@@ -123,7 +123,7 @@ with DAG(
         env_from=configmaps,
         name="airflow-test-pod",
         affinity=affinity,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         hostnetwork=False,
         tolerations=tolerations,
         init_containers=[init_container],
@@ -141,7 +141,7 @@ with DAG(
         arguments=["echo", "10", "echo pwd"],
         labels={"foo": "bar"},
         name="airflow-private-image-pod",
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         in_cluster=True,
         get_logs=True,
         deferrable=True,
@@ -156,7 +156,7 @@ with DAG(
         cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > 
/airflow/xcom/return.json"],
         name="write-xcom",
         do_xcom_push=True,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         in_cluster=True,
         get_logs=True,
         deferrable=True,
diff --git 
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
 
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
index d2a6161ed8..bf155499f2 100644
--- 
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
+++ 
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py
@@ -67,7 +67,7 @@ with models.DAG(
         image="perl",
         name="test-pod",
         in_cluster=False,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
     )
 
     # [START howto_operator_gke_start_pod_xcom]
@@ -82,7 +82,7 @@ with models.DAG(
         cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > 
/airflow/xcom/return.json"],
         name="test-pod-xcom",
         in_cluster=False,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
     )
     # [END howto_operator_gke_start_pod_xcom]
 
diff --git 
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
 
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
index 13c310d880..73430a3938 100644
--- 
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
+++ 
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
@@ -66,7 +66,7 @@ with models.DAG(
         image="perl",
         name="test-pod-async",
         in_cluster=False,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         get_logs=True,
         deferrable=True,
     )
@@ -82,7 +82,7 @@ with models.DAG(
         cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > 
/airflow/xcom/return.json"],
         name="test-pod-xcom-async",
         in_cluster=False,
-        is_delete_operator_pod=True,
+        on_finish_action="delete_pod",
         do_xcom_push=True,
         deferrable=True,
         get_logs=True,

Reply via email to