This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a563fb0f39a Unify Pod Startup Tracking: KubernetesPodTriggerer and
KubernetesPodOperator Now Share Common Startup Logic (#56875)
a563fb0f39a is described below
commit a563fb0f39af8efb78ec93759f6271fc691358c7
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu Oct 30 09:04:55 2025 +0100
Unify Pod Startup Tracking: KubernetesPodTriggerer and
KubernetesPodOperator Now Share Common Startup Logic (#56875)
* Move container-related functions from PodManager to a separate file
* Moved unit tests
* Sync and async workflow use the same code to track Pod startup
* Reworked unit tests and pod startup logic
* Add api permission error detection for triggerer
* Fix pytest fixture
* Removed not requried code
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../providers/cncf/kubernetes/exceptions.py | 8 +
.../providers/cncf/kubernetes/hooks/kubernetes.py | 35 ++-
.../providers/cncf/kubernetes/operators/pod.py | 1 +
.../providers/cncf/kubernetes/triggers/pod.py | 50 +++--
.../providers/cncf/kubernetes/utils/pod_manager.py | 235 +++++++++++++++------
.../unit/cncf/kubernetes/triggers/test_pod.py | 73 +++++--
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 168 +++++++++++++--
7 files changed, 463 insertions(+), 107 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
index c0b6ad83a3f..5503c743797 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
@@ -27,3 +27,11 @@ class PodMutationHookException(AirflowException):
class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod
configs."""
+
+
+class KubernetesApiError(AirflowException):
+ """Raised when an error is encountered while trying access Kubernetes
API."""
+
+
+class KubernetesApiPermissionError(AirflowException):
+ """Raised when an error is encountered while trying access Kubernetes
API."""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d99314efa1c..d59c6527d6b 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -37,6 +37,7 @@ from urllib3.exceptions import HTTPError
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models import Connection
+from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError,
KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl,
_enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
should_retry_creation
from airflow.providers.cncf.kubernetes.utils.container import (
@@ -48,7 +49,7 @@ from airflow.utils import yaml
if TYPE_CHECKING:
from kubernetes.client import V1JobList
- from kubernetes.client.models import V1Job, V1Pod
+ from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod
LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file
kube_config from {}..."
@@ -879,12 +880,17 @@ class AsyncKubernetesHook(KubernetesHook):
:param namespace: Name of the pod's namespace.
"""
async with self.get_conn() as connection:
- v1_api = async_client.CoreV1Api(connection)
- pod: V1Pod = await v1_api.read_namespaced_pod(
- name=name,
- namespace=namespace,
- )
- return pod
+ try:
+ v1_api = async_client.CoreV1Api(connection)
+ pod: V1Pod = await v1_api.read_namespaced_pod(
+ name=name,
+ namespace=namespace,
+ )
+ return pod
+ except HTTPError as e:
+ if hasattr(e, "status") and e.status == 403:
+ raise KubernetesApiPermissionError("Permission denied
(403) from Kubernetes API.") from e
+ raise KubernetesApiError from e
async def delete_pod(self, name: str, namespace: str):
"""
@@ -933,6 +939,21 @@ class AsyncKubernetesHook(KubernetesHook):
self.log.exception("There was an error reading the kubernetes
API.")
raise
+ async def get_pod_events(self, name: str, namespace: str) ->
CoreV1EventList:
+ """Get pod's events."""
+ async with self.get_conn() as connection:
+ try:
+ v1_api = async_client.CoreV1Api(connection)
+ events: CoreV1EventList = await v1_api.list_namespaced_event(
+ field_selector=f"involvedObject.name={name}",
+ namespace=namespace,
+ )
+ return events
+ except HTTPError as e:
+ if hasattr(e, "status") and e.status == 403:
+ raise KubernetesApiPermissionError("Permission denied
(403) from Kubernetes API.") from e
+ raise KubernetesApiError from e
+
async def get_job_status(self, name: str, namespace: str) -> V1Job:
"""
Get job's status object.
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 1df9a1be8ff..895ee8ddab0 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -868,6 +868,7 @@ class KubernetesPodOperator(BaseOperator):
get_logs=self.get_logs,
startup_timeout=self.startup_timeout_seconds,
startup_check_interval=self.startup_check_interval_seconds,
+ schedule_timeout=self.schedule_timeout_seconds,
base_container_name=self.base_container_name,
on_finish_action=self.on_finish_action.value,
last_log_time=last_log_time,
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 9008e0573ec..78e760df018 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -26,8 +26,10 @@ from typing import TYPE_CHECKING, Any, cast
import tenacity
+from airflow.providers.cncf.kubernetes.exceptions import
KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+ AsyncPodManager,
OnFinishAction,
PodLaunchTimeoutException,
PodPhase,
@@ -69,6 +71,7 @@ class KubernetesPodTrigger(BaseTrigger):
:param get_logs: get the stdout of the container as logs of the tasks.
:param startup_timeout: timeout in seconds to start up the pod.
:param startup_check_interval: interval in seconds to check if the pod has
already started.
+ :param schedule_timeout: timeout in seconds to schedule pod in cluster.
:param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if
"delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
@@ -91,7 +94,8 @@ class KubernetesPodTrigger(BaseTrigger):
in_cluster: bool | None = None,
get_logs: bool = True,
startup_timeout: int = 120,
- startup_check_interval: int = 5,
+ startup_check_interval: float = 5,
+ schedule_timeout: int = 120,
on_finish_action: str = "delete_pod",
last_log_time: DateTime | None = None,
logging_interval: int | None = None,
@@ -110,11 +114,11 @@ class KubernetesPodTrigger(BaseTrigger):
self.get_logs = get_logs
self.startup_timeout = startup_timeout
self.startup_check_interval = startup_check_interval
+ self.schedule_timeout = schedule_timeout
self.last_log_time = last_log_time
self.logging_interval = logging_interval
self.on_finish_action = OnFinishAction(on_finish_action)
self.trigger_kwargs = trigger_kwargs or {}
-
self._since_time = None
def serialize(self) -> tuple[str, dict[str, Any]]:
@@ -133,6 +137,7 @@ class KubernetesPodTrigger(BaseTrigger):
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
"startup_check_interval": self.startup_check_interval,
+ "schedule_timeout": self.schedule_timeout,
"trigger_start_time": self.trigger_start_time,
"on_finish_action": self.on_finish_action.value,
"last_log_time": self.last_log_time,
@@ -187,6 +192,22 @@ class KubernetesPodTrigger(BaseTrigger):
}
)
return
+ except KubernetesApiPermissionError as e:
+ message = (
+ "Kubernetes API permission error: The triggerer may not have
sufficient permissions to monitor or delete pods. "
+ "Please ensure the triggerer's service account is included in
the 'pod-launcher-role' as defined in the latest Airflow Helm chart. "
+ f"Original error: {e}"
+ )
+ yield TriggerEvent(
+ {
+ "name": self.pod_name,
+ "namespace": self.pod_namespace,
+ "status": "error",
+ "message": message,
+ **self.trigger_kwargs,
+ }
+ )
+ return
except Exception as e:
self.log.exception(
"Unexpected error while waiting for pod %s in namespace %s",
@@ -219,17 +240,16 @@ class KubernetesPodTrigger(BaseTrigger):
async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached,
throws error."""
- while True:
- pod = await self._get_pod()
- if not pod.status.phase == "Pending":
- return self.define_container_state(pod)
-
- delta = datetime.datetime.now(tz=datetime.timezone.utc) -
self.trigger_start_time
- if self.startup_timeout < delta.total_seconds():
- raise PodLaunchTimeoutException("Pod did not leave 'Pending'
phase within specified timeout")
-
- self.log.info("Still waiting for pod to start. The pod state is
%s", pod.status.phase)
- await asyncio.sleep(self.startup_check_interval)
+ pod = await self._get_pod()
+ events_task = self.pod_manager.watch_pod_events(pod,
self.startup_check_interval)
+ pod_start_task = self.pod_manager.await_pod_start(
+ pod=pod,
+ schedule_timeout=self.schedule_timeout,
+ startup_timeout=self.startup_timeout,
+ check_interval=self.startup_check_interval,
+ )
+ await asyncio.gather(pod_start_task, events_task)
+ return self.define_container_state(await self._get_pod())
async def _wait_for_container_completion(self) -> TriggerEvent:
"""
@@ -297,6 +317,10 @@ class KubernetesPodTrigger(BaseTrigger):
cluster_context=self.cluster_context,
)
+ @cached_property
+ def pod_manager(self) -> AsyncPodManager:
+ return AsyncPodManager(async_hook=self.hook)
+
def define_container_state(self, pod: V1Pod) -> ContainerState:
pod_containers = pod.status.container_statuses
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index c9dc45e57f4..789ad4db79e 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -60,6 +60,8 @@ if TYPE_CHECKING:
from kubernetes.client.models.v1_pod_condition import V1PodCondition
from urllib3.response import HTTPResponse
+ from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook
+
EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__"
"""
@@ -99,6 +101,109 @@ def check_exception_is_kubernetes_api_unauthorized(exc:
BaseException):
return isinstance(exc, ApiException) and exc.status and str(exc.status) ==
"401"
+async def watch_pod_events(
+ pod_manager: PodManager | AsyncPodManager,
+ pod: V1Pod,
+ check_interval: float = 1,
+) -> None:
+ """
+ Read pod events and write them to the log.
+
+ This function supports both asynchronous and synchronous pod managers.
+
+ :param pod_manager: The pod manager instance (PodManager or
AsyncPodManager).
+ :param pod: The pod object to monitor.
+ :param check_interval: Interval (in seconds) between checks.
+ """
+ num_events = 0
+ is_async = isinstance(pod_manager, AsyncPodManager)
+ while not pod_manager.stop_watching_events:
+ if is_async:
+ events = await pod_manager.read_pod_events(pod)
+ else:
+ events = pod_manager.read_pod_events(pod)
+ for new_event in events.items[num_events:]:
+ involved_object: V1ObjectReference = new_event.involved_object
+ pod_manager.log.info(
+ "The Pod has an Event: %s from %s", new_event.message,
involved_object.field_path
+ )
+ num_events = len(events.items)
+ await asyncio.sleep(check_interval)
+
+
+async def await_pod_start(
+ pod_manager: PodManager | AsyncPodManager,
+ pod: V1Pod,
+ schedule_timeout: int = 120,
+ startup_timeout: int = 120,
+ check_interval: float = 1,
+):
+ """
+ Monitor the startup phase of a Kubernetes pod, waiting for it to leave the
``Pending`` state.
+
+ This function is shared by both PodManager and AsyncPodManager to provide
consistent pod startup tracking.
+
+ :param pod_manager: The pod manager instance (PodManager or
AsyncPodManager).
+ :param pod: The pod object to monitor.
+ :param schedule_timeout: Maximum time (in seconds) to wait for the pod to
be scheduled.
+ :param startup_timeout: Maximum time (in seconds) to wait for the pod to
start running after being scheduled.
+ :param check_interval: Interval (in seconds) between status checks.
+ :param is_async: Set to True if called in an async context; otherwise,
False.
+ """
+ pod_manager.log.info("::group::Waiting until %ss to get the POD
scheduled...", schedule_timeout)
+ pod_was_scheduled = False
+ start_check_time = time.time()
+ is_async = isinstance(pod_manager, AsyncPodManager)
+ while True:
+ if is_async:
+ remote_pod = await pod_manager.read_pod(pod)
+ else:
+ remote_pod = pod_manager.read_pod(pod)
+ pod_status = remote_pod.status
+ if pod_status.phase != PodPhase.PENDING:
+ pod_manager.stop_watching_events = True
+ pod_manager.log.info("::endgroup::")
+ break
+
+ # Check for timeout
+ pod_conditions: list[V1PodCondition] = pod_status.conditions
+ if pod_conditions and any(
+ (condition.type == "PodScheduled" and condition.status == "True")
for condition in pod_conditions
+ ):
+ if not pod_was_scheduled:
+ # POD was initially scheduled update timeout for getting POD
launched
+ pod_was_scheduled = True
+ start_check_time = time.time()
+ pod_manager.log.info("Waiting %ss to get the POD running...",
startup_timeout)
+
+ if time.time() - start_check_time >= startup_timeout:
+ pod_manager.log.info("::endgroup::")
+ raise PodLaunchTimeoutException(
+ f"Pod took too long to start. More than
{startup_timeout}s. Check the pod events in kubernetes."
+ )
+ else:
+ if time.time() - start_check_time >= schedule_timeout:
+ pod_manager.log.info("::endgroup::")
+ raise PodLaunchTimeoutException(
+ f"Pod took too long to be scheduled on the cluster, giving
up. More than {schedule_timeout}s. Check the pod events in kubernetes."
+ )
+
+ # Check for general problems to terminate early - ErrImagePull
+ if pod_status.container_statuses:
+ for container_status in pod_status.container_statuses:
+ container_state: V1ContainerState = container_status.state
+ container_waiting: V1ContainerStateWaiting | None =
container_state.waiting
+ if container_waiting:
+ if container_waiting.reason in ["ErrImagePull",
"InvalidImageName"]:
+ pod_manager.log.info("::endgroup::")
+ raise PodLaunchFailedException(
+ f"Pod docker image cannot be pulled, unable to
start: {container_waiting.reason}"
+ f"\n{container_waiting.message}"
+ )
+
+ await asyncio.sleep(check_interval)
+
+
class PodLaunchTimeoutException(AirflowException):
"""When pod does not leave the ``Pending`` phase within specified
timeout."""
@@ -262,16 +367,7 @@ class PodManager(LoggingMixin):
async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) ->
None:
"""Read pod events and writes into log."""
- num_events = 0
- while not self.stop_watching_events:
- events = self.read_pod_events(pod)
- for new_event in events.items[num_events:]:
- involved_object: V1ObjectReference = new_event.involved_object
- self.log.info(
- "The Pod has an Event: %s from %s", new_event.message,
involved_object.field_path
- )
- num_events = len(events.items)
- await asyncio.sleep(check_interval)
+ await watch_pod_events(pod_manager=self, pod=pod,
check_interval=check_interval)
async def await_pod_start(
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int =
120, check_interval: int = 1
@@ -287,55 +383,13 @@ class PodManager(LoggingMixin):
:param check_interval: Interval (in seconds) between checks
:return:
"""
- self.log.info("::group::Waiting until %ss to get the POD
scheduled...", schedule_timeout)
- pod_was_scheduled = False
- start_check_time = time.time()
- while True:
- remote_pod = self.read_pod(pod)
- pod_status = remote_pod.status
- if pod_status.phase != PodPhase.PENDING:
- self.stop_watching_events = True
- self.log.info("::endgroup::")
- break
-
- # Check for timeout
- pod_conditions: list[V1PodCondition] = pod_status.conditions
- if pod_conditions and any(
- (condition.type == "PodScheduled" and condition.status ==
"True")
- for condition in pod_conditions
- ):
- if not pod_was_scheduled:
- # POD was initially scheduled update timeout for getting
POD launched
- pod_was_scheduled = True
- start_check_time = time.time()
- self.log.info("Waiting %ss to get the POD running...",
startup_timeout)
-
- if time.time() - start_check_time >= startup_timeout:
- self.log.info("::endgroup::")
- raise PodLaunchFailedException(
- f"Pod took too long to start. More than
{startup_timeout}s. Check the pod events in kubernetes."
- )
- else:
- if time.time() - start_check_time >= schedule_timeout:
- self.log.info("::endgroup::")
- raise PodLaunchFailedException(
- f"Pod took too long to be scheduled on the cluster,
giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
- )
-
- # Check for general problems to terminate early - ErrImagePull
- if pod_status.container_statuses:
- for container_status in pod_status.container_statuses:
- container_state: V1ContainerState = container_status.state
- container_waiting: V1ContainerStateWaiting | None =
container_state.waiting
- if container_waiting:
- if container_waiting.reason in ["ErrImagePull",
"InvalidImageName"]:
- self.log.info("::endgroup::")
- raise PodLaunchFailedException(
- f"Pod docker image cannot be pulled, unable to
start: {container_waiting.reason}"
- f"\n{container_waiting.message}"
- )
-
- await asyncio.sleep(check_interval)
+ await await_pod_start(
+ pod_manager=self,
+ pod=pod,
+ schedule_timeout=schedule_timeout,
+ startup_timeout=startup_timeout,
+ check_interval=check_interval,
+ )
def _log_message(
self,
@@ -915,3 +969,66 @@ class OnFinishAction(str, enum.Enum):
def is_log_group_marker(line: str) -> bool:
"""Check if the line is a log group marker like `::group::` or
`::endgroup::`."""
return line.startswith("::group::") or line.startswith("::endgroup::")
+
+
+class AsyncPodManager(LoggingMixin):
+ """Create, monitor, and otherwise interact with Kubernetes pods for use
with the KubernetesPodTriggerer."""
+
+ def __init__(
+ self,
+ async_hook: AsyncKubernetesHook,
+ callbacks: list[type[KubernetesPodOperatorCallback]] | None = None,
+ ):
+ """
+ Create the launcher.
+
+ :param kube_client: kubernetes client
+ :param callbacks:
+ """
+ super().__init__()
+ self._hook = async_hook
+ self._watch = watch.Watch()
+ self._callbacks = callbacks or []
+ self.stop_watching_events = False
+
+ @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ async def read_pod(self, pod: V1Pod) -> V1Pod:
+ """Read POD information."""
+ return await self._hook.get_pod(
+ pod.metadata.name,
+ pod.metadata.namespace,
+ )
+
+ @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
+ """Get pod's events."""
+ return await self._hook.get_pod_events(
+ pod.metadata.name,
+ pod.metadata.namespace,
+ )
+
+ async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) ->
None:
+ """Read pod events and writes into log."""
+ await watch_pod_events(pod_manager=self, pod=pod,
check_interval=check_interval)
+
+ async def await_pod_start(
+ self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int =
120, check_interval: float = 1
+ ) -> None:
+ """
+ Wait for the pod to reach phase other than ``Pending``.
+
+ :param pod:
+ :param schedule_timeout: Timeout (in seconds) for pod stay in schedule
state
+ (if pod is taking to long in schedule state, fails task)
+ :param startup_timeout: Timeout (in seconds) for startup of the pod
+ (if pod is pending for too long after being scheduled, fails task)
+ :param check_interval: Interval (in seconds) between checks
+ :return:
+ """
+ await await_pod_start(
+ pod_manager=self,
+ pod=pod,
+ schedule_timeout=schedule_timeout,
+ startup_timeout=startup_timeout,
+ check_interval=check_interval,
+ )
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 66fae2524d6..fc477fe0cf7 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -44,6 +44,7 @@ CONFIG_DICT = {"a": "b"}
IN_CLUSTER = False
GET_LOGS = True
STARTUP_TIMEOUT_SECS = 120
+STARTUP_CHECK_INTERVAL_SECS = 0.1
TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc)
FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
BASE_CONTAINER_NAME = "base"
@@ -63,11 +64,24 @@ def trigger():
in_cluster=IN_CLUSTER,
get_logs=GET_LOGS,
startup_timeout=STARTUP_TIMEOUT_SECS,
+ startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+ schedule_timeout=STARTUP_TIMEOUT_SECS,
trigger_start_time=TRIGGER_START_TIME,
on_finish_action=ON_FINISH_ACTION,
)
[email protected]
+def mock_time_fixture():
+ """Fixture to simulate time passage beyond startup timeout."""
+ with mock.patch("time.time") as mock_time:
+ start_time = 1000
+ mock_time.side_effect = [
+ *(start_time + STARTUP_TIMEOUT_SECS * n for n in range(5)),
+ ]
+ yield mock_time
+
+
def get_read_pod_mock_containers(statuses_to_emit=None):
"""
Emit pods with given phases sequentially.
@@ -106,7 +120,8 @@ class TestKubernetesPodTrigger:
"in_cluster": IN_CLUSTER,
"get_logs": GET_LOGS,
"startup_timeout": STARTUP_TIMEOUT_SECS,
- "startup_check_interval": 5,
+ "startup_check_interval": STARTUP_CHECK_INTERVAL_SECS,
+ "schedule_timeout": STARTUP_TIMEOUT_SECS,
"trigger_start_time": TRIGGER_START_TIME,
"on_finish_action": ON_FINISH_ACTION,
"last_log_time": None,
@@ -138,7 +153,7 @@ class TestKubernetesPodTrigger:
async def test_run_loop_return_waiting_event(
self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
- mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
mock_method.return_value = ContainerState.WAITING
caplog.set_level(logging.INFO)
@@ -157,7 +172,7 @@ class TestKubernetesPodTrigger:
async def test_run_loop_return_running_event(
self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
):
- mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
mock_method.return_value = ContainerState.RUNNING
caplog.set_level(logging.INFO)
@@ -229,7 +244,7 @@ class TestKubernetesPodTrigger:
Test that KubernetesPodTrigger fires the correct event in case of fail.
"""
- mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
mock_method.return_value = ContainerState.FAILED
caplog.set_level(logging.INFO)
@@ -319,14 +334,46 @@ class TestKubernetesPodTrigger:
assert expected_state == trigger.define_container_state(pod)
+ @pytest.mark.asyncio
+ @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+ @mock.patch(f"{TRIGGER_PATH}.hook")
+ async def test_run_loop_read_events_during_start(self, mock_hook,
mock_method, trigger):
+ event1 = mock.AsyncMock()
+ event1.message = "event 1"
+ event1.involved_object.field_path = "object 1"
+ event2 = mock.AsyncMock()
+ event2.message = "event 2"
+ event2.involved_object.field_path = "object 2"
+ events_list = mock.AsyncMock()
+ events_list.items = [event1, event2]
+
+ mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list)
+
+ pod_pending = mock.MagicMock()
+ pod_pending.status.phase = PodPhase.PENDING
+ pod_succeeded = mock.MagicMock()
+ pod_succeeded.status.phase = PodPhase.SUCCEEDED
+
+ mock_hook.get_pod = mock.AsyncMock(
+ side_effect=[pod_pending, pod_pending, pod_succeeded,
pod_succeeded]
+ )
+
+ mock_method.return_value = ContainerState.TERMINATED
+
+ with mock.patch.object(trigger.pod_manager.log, "info") as
mock_log_info:
+ generator = trigger.run()
+ await generator.asend(None)
+
+ mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"event 1", "object 1")
+ mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"event 2", "object 2")
+
@pytest.mark.asyncio
@pytest.mark.parametrize("container_state", [ContainerState.WAITING,
ContainerState.UNDEFINED])
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_timeout_event(
- self, mock_hook, mock_method, trigger, caplog, container_state
+ self, mock_hook, mock_method, trigger, container_state,
mock_time_fixture
):
- trigger.trigger_start_time = TRIGGER_START_TIME -
datetime.timedelta(minutes=2)
mock_hook.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
status=mock.MagicMock(
@@ -335,9 +382,6 @@ class TestKubernetesPodTrigger:
)
)
mock_method.return_value = container_state
-
- caplog.set_level(logging.INFO)
-
generator = trigger.run()
actual = await generator.asend(None)
assert (
@@ -346,7 +390,7 @@ class TestKubernetesPodTrigger:
"name": POD_NAME,
"namespace": NAMESPACE,
"status": "timeout",
- "message": "Pod did not leave 'Pending' phase within
specified timeout",
+ "message": "Pod took too long to be scheduled on the
cluster, giving up. More than 120s. Check the pod events in kubernetes.",
}
)
== actual
@@ -356,14 +400,13 @@ class TestKubernetesPodTrigger:
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_return_success_for_completed_pod_after_timeout(
- self, mock_hook, mock_method, trigger, caplog
+ self, mock_hook, mock_method, trigger, mock_time_fixture
):
"""
Test that the trigger correctly recognizes the pod is not pending even
after the timeout has been
reached. This may happen when a new triggerer process takes over the
trigger, the pod already left
pending state and the timeout has been reached.
"""
- trigger.trigger_start_time = TRIGGER_START_TIME -
datetime.timedelta(minutes=2)
mock_hook.get_pod.return_value = self._mock_pod_result(
mock.MagicMock(
status=mock.MagicMock(
@@ -373,8 +416,6 @@ class TestKubernetesPodTrigger:
)
mock_method.return_value = ContainerState.TERMINATED
- caplog.set_level(logging.INFO)
-
generator = trigger.run()
actual = await generator.asend(None)
assert (
@@ -396,7 +437,7 @@ class TestKubernetesPodTrigger:
Test that KubernetesPodTrigger _get_pod is called with the correct
arguments.
"""
- mock_hook.get_pod.return_value =
self._mock_pod_result(mock.MagicMock())
+ mock_hook.get_pod.return_value =
self._mock_pod_result(mock.AsyncMock())
await trigger._get_pod()
mock_hook.get_pod.assert_called_with(name=POD_NAME,
namespace=NAMESPACE)
@@ -423,7 +464,7 @@ class TestKubernetesPodTrigger:
the hook.get_pod call.
"""
- side_effects = [Exception("Test exception") for _ in range(exc_count)]
+ [MagicMock()]
+ side_effects = [Exception("Test exception") for _ in range(exc_count)]
+ [mock.AsyncMock()]
mock_hook.get_pod.side_effect =
mock.AsyncMock(side_effect=side_effects)
# We expect the exception to be raised only if the number of retries
is exceeded
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 2634119fc29..485bdbd769e 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -31,6 +31,7 @@ from urllib3.exceptions import HTTPError as BaseHTTPError
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+ AsyncPodManager,
PodLogsConsumer,
PodManager,
PodPhase,
@@ -477,7 +478,7 @@ class TestPodManager:
@pytest.mark.asyncio
@mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
- async def test_start_pod_startup_interval_seconds(self, mock_time_sleep,
caplog):
+ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
condition_scheduled = mock.MagicMock()
condition_scheduled.type = "PodScheduled"
condition_scheduled.status = "True"
@@ -500,17 +501,21 @@ class TestPodManager:
schedule_timeout = 30
startup_timeout = 60
mock_pod = MagicMock()
- await self.pod_manager.await_pod_start(
- pod=mock_pod,
- schedule_timeout=schedule_timeout, # Never hit, any value is
fine, as time.sleep is mocked to do nothing
- startup_timeout=startup_timeout, # Never hit, any value is fine,
as time.sleep is mocked to do nothing
- check_interval=startup_check_interval,
- )
- mock_time_sleep.assert_called_with(startup_check_interval)
- assert mock_time_sleep.call_count == 3
- assert f"::group::Waiting until {schedule_timeout}s to get the POD
scheduled..." in caplog.text
- assert f"Waiting {startup_timeout}s to get the POD running..." in
caplog.text
- assert self.pod_manager.stop_watching_events is True
+
+ with mock.patch.object(self.pod_manager.log, "info") as mock_log_info:
+ await self.pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=schedule_timeout, # Never hit, any value is
fine, as time.sleep is mocked to do nothing
+ startup_timeout=startup_timeout, # Never hit, any value is
fine, as time.sleep is mocked to do nothing
+ check_interval=startup_check_interval,
+ )
+ mock_time_sleep.assert_called_with(startup_check_interval)
+ assert self.pod_manager.stop_watching_events is True
+ assert mock_time_sleep.call_count == 3
+ mock_log_info.assert_any_call(
+ "::group::Waiting until %ss to get the POD scheduled...",
schedule_timeout
+ )
+ mock_log_info.assert_any_call("Waiting %ss to get the POD
running...", startup_timeout)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_container_is_running(self, container_is_running_mock):
@@ -700,6 +705,145 @@ class TestPodManager:
mock_container_is_running.assert_any_call(mock_pod,
"airflow-xcom-sidecar")
+class TestAsyncPodManager:
+ def setup_method(self):
+ self.mock_async_hook = mock.AsyncMock()
+ self.async_pod_manager = AsyncPodManager(
+ async_hook=self.mock_async_hook,
+ callbacks=[],
+ )
+
+ @pytest.mark.asyncio
+ async def
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
+ pod_response = mock.MagicMock()
+ pod_response.status.phase = "Pending"
+ self.mock_async_hook.get_pod.return_value = pod_response
+ expected_msg = "Pod took too long to be scheduled on the cluster,
giving up. More than 0s. Check the pod events in kubernetes."
+ mock_pod = mock.MagicMock()
+ with pytest.raises(AirflowException, match=expected_msg):
+ await self.async_pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=0,
+ startup_timeout=0,
+ )
+ self.mock_async_hook.get_pod.assert_called()
+
+ @pytest.mark.asyncio
+ async def test_start_pod_raises_informative_error_on_startup_timeout(self):
+ pod_response = mock.MagicMock()
+ pod_response.status.phase = "Pending"
+ condition = mock.MagicMock()
+ condition.type = "PodScheduled"
+ condition.status = "True"
+ pod_response.status.conditions = [condition]
+ self.mock_async_hook.get_pod.return_value = pod_response
+ expected_msg = "Pod took too long to start. More than 0s. Check the
pod events in kubernetes."
+ mock_pod = mock.MagicMock()
+ with pytest.raises(AirflowException, match=expected_msg):
+ await self.async_pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=0,
+ startup_timeout=0,
+ )
+ self.mock_async_hook.get_pod.assert_called()
+
+ @pytest.mark.asyncio
+ async def test_start_pod_raises_fast_error_on_image_error(self):
+ pod_response = mock.MagicMock()
+ pod_response.status.phase = "Pending"
+ container_status = mock.MagicMock()
+ waiting_state = mock.MagicMock()
+ waiting_state.reason = "ErrImagePull"
+ waiting_state.message = "Test error"
+ container_status.state.waiting = waiting_state
+ pod_response.status.container_statuses = [container_status]
+ self.mock_async_hook.get_pod.return_value = pod_response
+ expected_msg = f"Pod docker image cannot be pulled, unable to start:
{waiting_state.reason}\n{waiting_state.message}"
+ mock_pod = mock.MagicMock()
+ with pytest.raises(AirflowException, match=expected_msg):
+ await self.async_pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=60,
+ startup_timeout=60,
+ )
+ self.mock_async_hook.get_pod.assert_called()
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
+ condition_scheduled = mock.MagicMock()
+ condition_scheduled.type = "PodScheduled"
+ condition_scheduled.status = "True"
+
+ pod_info_pending = mock.MagicMock()
+ pod_info_pending.status.phase = PodPhase.PENDING
+ pod_info_pending.status.conditions = []
+
+ pod_info_pending_scheduled = mock.MagicMock()
+ pod_info_pending_scheduled.status.phase = PodPhase.PENDING
+ pod_info_pending_scheduled.status.conditions = [condition_scheduled]
+
+ pod_info_succeeded = mock.MagicMock()
+ pod_info_succeeded.status.phase = PodPhase.SUCCEEDED
+
+ # Simulate sequence of pod states
+ self.mock_async_hook.get_pod.side_effect = [
+ pod_info_pending,
+ pod_info_pending_scheduled,
+ pod_info_pending_scheduled,
+ pod_info_succeeded,
+ ]
+ startup_check_interval = 10
+ schedule_timeout = 30
+ startup_timeout = 60
+ mock_pod = mock.MagicMock()
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ await self.async_pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=schedule_timeout,
+ startup_timeout=startup_timeout,
+ check_interval=startup_check_interval,
+ )
+ assert mock_time_sleep.call_count == 3
+ mock_log_info.assert_any_call(
+ "::group::Waiting until %ss to get the POD scheduled...",
schedule_timeout
+ )
+ mock_log_info.assert_any_call("Waiting %ss to get the POD
running...", startup_timeout)
+ assert self.async_pod_manager.stop_watching_events is True
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+ async def test_watch_pod_events(self, mock_time_sleep):
+ mock_pod = mock.MagicMock()
+ mock_pod.metadata.name = "test-pod"
+ mock_pod.metadata.namespace = "default"
+
+ events = mock.MagicMock()
+ events.items = []
+ for id in ["event 1", "event 2"]:
+ event = mock.MagicMock()
+ event.message = f"test {id}"
+ event.involved_object.field_path = f"object {id}"
+ events.items.append(event)
+ startup_check_interval = 10
+
+ def get_pod_events_side_effect(name, namespace):
+ self.async_pod_manager.stop_watching_events = True
+ return events
+
+ self.mock_async_hook.get_pod_events.side_effect =
get_pod_events_side_effect
+
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ await self.async_pod_manager.watch_pod_events(pod=mock_pod,
check_interval=startup_check_interval)
+ mock_log_info.assert_any_call(
+ "The Pod has an Event: %s from %s", "test event 1", "object
event 1"
+ )
+ mock_log_info.assert_any_call(
+ "The Pod has an Event: %s from %s", "test event 2", "object
event 2"
+ )
+ mock_time_sleep.assert_called_once_with(startup_check_interval)
+
+
class TestPodLogsConsumer:
@pytest.mark.parametrize(
"chunks, expected_logs",