This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a630926a187 KubernetesPodTriggerer reads pod logs instead of
KubernetesPodOperator (#57531)
a630926a187 is described below
commit a630926a18730f3adcfccb619a5904509c17d6bc
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu Nov 6 22:11:05 2025 +0100
KubernetesPodTriggerer reads pod logs instead of KubernetesPodOperator
(#57531)
* Move container-related functions from PodManager to a separate file
* Moved unit tests
* Sync and async workflow use the same code to track Pod startup
* Reworked unit tests and pod startup logic
* Add api permission error detection for triggerer
* Fix pytest fixture
* Removed not requried code
* Move log file reading to triggerer
* Removed not required code
* Removed not requried unit test
* Fixed unit tests
* Clean up unit tests
* Adapt unit test
* Adapt return type
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 15 +-
.../providers/cncf/kubernetes/operators/pod.py | 33 ----
.../providers/cncf/kubernetes/triggers/pod.py | 18 +-
.../providers/cncf/kubernetes/utils/pod_manager.py | 86 ++++++++--
.../unit/cncf/kubernetes/hooks/test_kubernetes.py | 36 ++--
.../unit/cncf/kubernetes/operators/test_pod.py | 49 ------
.../unit/cncf/kubernetes/triggers/test_pod.py | 38 +++--
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 187 +++++++++++++++++----
.../google/cloud/hooks/test_kubernetes_engine.py | 8 +-
9 files changed, 290 insertions(+), 180 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d59c6527d6b..49168cc6dfc 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -910,7 +910,9 @@ class AsyncKubernetesHook(KubernetesHook):
if str(e.status) != "404":
raise
- async def read_logs(self, name: str, namespace: str):
+ async def read_logs(
+ self, name: str, namespace: str, container_name: str | None = None,
since_seconds: int | None = None
+ ) -> list[str]:
"""
Read logs inside the pod while starting containers inside.
@@ -921,6 +923,8 @@ class AsyncKubernetesHook(KubernetesHook):
:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
+ :param container_name: Name of the container inside the pod.
+ :param since_seconds: Only return logs newer than a relative duration
in seconds.
"""
async with self.get_conn() as connection:
try:
@@ -928,16 +932,15 @@ class AsyncKubernetesHook(KubernetesHook):
logs = await v1_api.read_namespaced_pod_log(
name=name,
namespace=namespace,
+ container_name=container_name,
follow=False,
timestamps=True,
+ since_seconds=since_seconds,
)
logs = logs.splitlines()
- for line in logs:
- self.log.info("Container logs from %s", line)
return logs
- except HTTPError:
- self.log.exception("There was an error reading the kubernetes
API.")
- raise
+ except HTTPError as e:
+ raise KubernetesApiError from e
async def get_pod_events(self, name: str, namespace: str) ->
CoreV1EventList:
"""Get pod's events."""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 895ee8ddab0..4249f56337d 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -899,17 +899,6 @@ class KubernetesPodOperator(BaseOperator):
if not self.pod:
raise PodNotFoundException("Could not find pod after resuming
from deferral")
- if event["status"] != "running":
- for callback in self.callbacks:
- callback.on_operator_resuming(
- pod=self.pod,
- event=event,
- client=self.client,
- mode=ExecutionMode.SYNC,
- context=context,
- operator=self,
- )
-
follow = self.logging_interval is None
last_log_time = event.get("last_log_time")
@@ -942,34 +931,12 @@ class KubernetesPodOperator(BaseOperator):
)
message = event.get("stack_trace", event["message"])
raise AirflowException(message)
-
- return xcom_sidecar_output
-
- if event["status"] == "running":
- if self.get_logs:
- self.log.info("Resuming logs read from time %r",
last_log_time)
-
- pod_log_status = self.pod_manager.fetch_container_logs(
- pod=self.pod,
- container_name=self.base_container_name,
- follow=follow,
- since_time=last_log_time,
-
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
- log_formatter=self.log_formatter,
- )
-
- self.invoke_defer_method(pod_log_status.last_log_time)
- else:
- self.invoke_defer_method()
except TaskDeferred:
raise
finally:
self._clean(event=event, context=context,
result=xcom_sidecar_output)
def _clean(self, event: dict[str, Any], result: dict | None, context:
Context) -> None:
- if event["status"] == "running":
- return
-
if self.pod is None:
return
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 78e760df018..29c1a382a0b 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -287,16 +287,14 @@ class KubernetesPodTrigger(BaseTrigger):
}
)
self.log.debug("Container is not completed and still working.")
- if time_get_more_logs and
datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
- return TriggerEvent(
- {
- "status": "running",
- "last_log_time": self.last_log_time,
- "namespace": self.pod_namespace,
- "name": self.pod_name,
- **self.trigger_kwargs,
- }
- )
+ now = datetime.datetime.now(tz=datetime.timezone.utc)
+ if time_get_more_logs and now >= time_get_more_logs:
+ if self.get_logs and self.logging_interval:
+ self.last_log_time = await
self.pod_manager.fetch_container_logs_before_current_sec(
+ pod, container_name=self.base_container_name,
since_time=self.last_log_time
+ )
+ time_get_more_logs = now +
datetime.timedelta(seconds=self.logging_interval)
+
self.log.debug("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 789ad4db79e..a8cdcf9c634 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -480,7 +480,7 @@ class PodManager(LoggingMixin):
try:
for raw_line in logs:
line = raw_line.decode("utf-8",
errors="backslashreplace")
- line_timestamp, message = self.parse_log_line(line)
+ line_timestamp, message = parse_log_line(line)
if line_timestamp: # detect new log line
if message_to_log is None: # first line in the log
message_to_log = message
@@ -708,22 +708,6 @@ class PodManager(LoggingMixin):
time.sleep(2)
return remote_pod
- def parse_log_line(self, line: str) -> tuple[DateTime | None, str]:
- """
- Parse K8s log line and returns the final state.
-
- :param line: k8s log line
- :return: timestamp and log message
- """
- timestamp, sep, message = line.strip().partition(" ")
- if not sep:
- return None, line
- try:
- last_log_time = cast("DateTime", pendulum.parse(timestamp))
- except ParserError:
- return None, line
- return last_log_time, message
-
def container_is_running(self, pod: V1Pod, container_name: str) -> bool:
"""Read pod and checks if container is running."""
remote_pod = self.read_pod(pod)
@@ -971,6 +955,23 @@ def is_log_group_marker(line: str) -> bool:
return line.startswith("::group::") or line.startswith("::endgroup::")
+def parse_log_line(line: str) -> tuple[DateTime | None, str]:
+ """
+ Parse K8s log line and returns the final state.
+
+ :param line: k8s log line
+ :return: timestamp and log message
+ """
+ timestamp, sep, message = line.strip().partition(" ")
+ if not sep:
+ return None, line
+ try:
+ last_log_time = cast("DateTime", pendulum.parse(timestamp))
+ except ParserError:
+ return None, line
+ return last_log_time, message
+
+
class AsyncPodManager(LoggingMixin):
"""Create, monitor, and otherwise interact with Kubernetes pods for use
with the KubernetesPodTriggerer."""
@@ -1032,3 +1033,54 @@ class AsyncPodManager(LoggingMixin):
startup_timeout=startup_timeout,
check_interval=check_interval,
)
+
+ @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ async def fetch_container_logs_before_current_sec(
+ self, pod: V1Pod, container_name: str, since_time: DateTime | None =
None
+ ) -> DateTime | None:
+ """
+ Asynchronously read the log file of the specified pod.
+
+ This method streams logs from the base container, skipping log lines
from the current second to prevent duplicate entries on subsequent reads. It is
designed to handle long-running containers and gracefully suppresses transient
interruptions.
+
+ :param pod: The pod specification to monitor.
+ :param container_name: The name of the container within the pod.
+ :param since_time: The timestamp from which to start reading logs.
+ :return: The timestamp to use for the next log read, representing the
start of the current second. Returns None if an exception occurred.
+ """
+ now = pendulum.now()
+ logs = await self._hook.read_logs(
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
+ container_name=container_name,
+ since_seconds=(math.ceil((now - since_time).total_seconds()) if
since_time else None),
+ )
+ message_to_log = None
+ try:
+ now_seconds = now.replace(microsecond=0)
+ for line in logs:
+ line_timestamp, message = parse_log_line(line)
+ # Skip log lines from the current second to prevent duplicate
entries on the next read.
+ # The API only allows specifying 'since_seconds', not an exact
timestamp.
+ if line_timestamp and line_timestamp.replace(microsecond=0) ==
now_seconds:
+ break
+ if line_timestamp: # detect new log line
+ if message_to_log is None: # first line in the log
+ message_to_log = message
+ else: # previous log line is complete
+ if message_to_log is not None:
+ if is_log_group_marker(message_to_log):
+ print(message_to_log)
+ else:
+ self.log.info("[%s] %s", container_name,
message_to_log)
+ message_to_log = message
+ elif message_to_log: # continuation of the previous log line
+ message_to_log = f"{message_to_log}\n{message}"
+ finally:
+ # log the last line and update the last_captured_timestamp
+ if message_to_log is not None:
+ if is_log_group_marker(message_to_log):
+ print(message_to_log)
+ else:
+ self.log.info("[%s] %s", container_name, message_to_log)
+ return now # Return the current time as the last log time to
ensure logs from the current second are read in the next fetch.
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
index 223c153cfe0..f6e51149ed3 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
@@ -1060,25 +1060,25 @@ class TestAsyncKubernetesHook:
config_file=None,
cluster_context=None,
)
- with mock.patch(
-
"airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.log",
- new_callable=PropertyMock,
- ) as log:
- await hook.read_logs(
- name=POD_NAME,
- namespace=NAMESPACE,
- )
- lib_method.assert_called_once()
- lib_method.assert_called_with(
- name=POD_NAME,
- namespace=NAMESPACE,
- follow=False,
- timestamps=True,
- )
- log.return_value.info.assert_called_with(
- "Container logs from %s", "2023-01-11 Some string logs..."
- )
+ logs = await hook.read_logs(
+ name=POD_NAME,
+ namespace=NAMESPACE,
+ container_name=CONTAINER_NAME,
+ since_seconds=10,
+ )
+
+ lib_method.assert_called_once()
+ lib_method.assert_called_with(
+ name=POD_NAME,
+ namespace=NAMESPACE,
+ container_name=CONTAINER_NAME,
+ follow=False,
+ timestamps=True,
+ since_seconds=10,
+ )
+ assert len(logs) == 1
+ assert "2023-01-11 Some string logs..." in logs
@pytest.mark.asyncio
@mock.patch(KUBE_BATCH_API.format("read_namespaced_job_status"))
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 3a64fe1f9b1..aef2ef29da3 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2166,18 +2166,6 @@ class TestKubernetesPodOperator:
process_pod_deletion_mock.assert_called_once_with(pod_1)
assert result.metadata.name == pod_2.metadata.name
- @patch(POD_MANAGER_CLASS.format("fetch_container_logs"))
- @patch(KUB_OP_PATH.format("invoke_defer_method"))
- def test_defere_call_one_more_time_after_error(self, invoke_defer_method,
fetch_container_logs):
- fetch_container_logs.return_value = PodLoggingStatus(False, None)
- op = KubernetesPodOperator(task_id="test_task", name="test-pod",
get_logs=True)
-
- op.trigger_reentry(
- create_context(op), event={"name": TEST_NAME, "namespace":
TEST_NAMESPACE, "status": "running"}
- )
-
- invoke_defer_method.assert_called_with(None)
-
class TestSuppress:
def test__suppress(self, caplog):
@@ -2611,32 +2599,6 @@ class TestKubernetesPodOperatorAsync:
with pytest.raises(AirflowException, match=expect_match):
k.cleanup(pod, pod)
- @patch(
-
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.convert_config_file_to_dict"
- )
- @patch(f"{HOOK_CLASS}.get_pod")
-
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
-
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
- def test_get_logs_running(
- self,
- fetch_container_logs,
- await_pod_completion,
- get_pod,
- mock_convert_config_file_to_dict,
- ):
- """When logs fetch exits with status running, raise task deferred"""
- pod = MagicMock()
- get_pod.return_value = pod
- op = KubernetesPodOperator(task_id="test_task", name="test-pod",
get_logs=True)
- await_pod_completion.return_value = None
- fetch_container_logs.return_value = PodLoggingStatus(True, None)
- with pytest.raises(TaskDeferred):
- op.trigger_reentry(
- create_context(op),
- event={"name": TEST_NAME, "namespace": TEST_NAMESPACE,
"status": "running"},
- )
- fetch_container_logs.is_called_with(pod, "base")
-
@patch(KUB_OP_PATH.format("_write_logs"))
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
@@ -2707,17 +2669,6 @@ class TestKubernetesPodOperatorAsync:
}
k.trigger_reentry(context=context, event=callback_event)
- # check on_operator_resuming callback
- mock_callbacks.on_operator_resuming.assert_called_once()
- assert mock_callbacks.on_operator_resuming.call_args.kwargs == {
- "client": k.client,
- "mode": ExecutionMode.SYNC,
- "pod": remote_pod_mock,
- "operator": k,
- "context": context,
- "event": callback_event,
- }
-
# check on_pod_cleanup callback
mock_callbacks.on_pod_cleanup.assert_called_once()
assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index ce57d0f5e96..1d4eead8858 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -259,7 +259,7 @@ class TestKubernetesPodTrigger:
pytest.param(
0,
{
- "status": "running",
+ "status": "success",
"last_log_time": DateTime(2022, 1, 1),
"name": POD_NAME,
"namespace": NAMESPACE,
@@ -268,26 +268,41 @@ class TestKubernetesPodTrigger:
),
],
)
+ @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.datetime")
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.triggers.pod.AsyncPodManager.fetch_container_logs_before_current_sec"
+ )
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod")
async def test_running_log_interval(
- self, mock_get_pod, mock_wait_pod, define_container_state,
logging_interval, exp_event
+ self,
+ mock_get_pod,
+ mock_fetch_container_logs_before_current_sec,
+ mock_wait_pod,
+ define_container_state,
+ mock_datetime,
+ logging_interval,
+ exp_event,
):
"""
- If log interval given, should emit event with running status and last
log time.
- Otherwise, should make it to second loop and emit "done" event.
- For this test we emit container status "running, running not".
- The first "running" status gets us out of wait_for_pod_start.
- The second "running" will fire a "running" event when logging interval
is non-None. When logging
- interval is None, the second "running" status will just result in
continuation of the loop. And
- when in the next loop we get a non-running status, the trigger fires a
"done" event.
+ If log interval given, check that the trigger fetches logs at the
right times.
"""
- define_container_state.return_value = "running"
+ fixed_now = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc)
+ mock_datetime.datetime.now.side_effect = [
+ fixed_now,
+ fixed_now + datetime.timedelta(seconds=1),
+ fixed_now + datetime.timedelta(seconds=2),
+ ]
+
+ mock_datetime.timedelta = datetime.timedelta
+ mock_datetime.timezone = datetime.timezone
+ mock_fetch_container_logs_before_current_sec.return_value =
DateTime(2022, 1, 1)
+ define_container_state.side_effect = ["running", "running",
"terminated"]
trigger = KubernetesPodTrigger(
pod_name=POD_NAME,
pod_namespace=NAMESPACE,
- trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc),
+ trigger_start_time=fixed_now,
base_container_name=BASE_CONTAINER_NAME,
startup_timeout=5,
poll_interval=1,
@@ -295,6 +310,7 @@ class TestKubernetesPodTrigger:
last_log_time=DateTime(2022, 1, 1),
)
assert await trigger.run().__anext__() == TriggerEvent(exp_event)
+ assert mock_fetch_container_logs_before_current_sec.call_count == 2
@pytest.mark.parametrize(
"container_state, expected_state",
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 485bdbd769e..2c1150fd9fe 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -27,14 +27,17 @@ import pendulum
import pytest
import time_machine
from kubernetes.client.rest import ApiException
+from tenacity import wait_none
from urllib3.exceptions import HTTPError as BaseHTTPError
from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
AsyncPodManager,
PodLogsConsumer,
PodManager,
PodPhase,
+ parse_log_line,
)
from airflow.utils.timezone import utc
@@ -44,6 +47,18 @@ if TYPE_CHECKING:
from pendulum import DateTime
+def test_parse_log_line():
+ log_message = "This should return no timestamp"
+ timestamp, line = parse_log_line(log_message)
+ assert timestamp is None
+ assert line == log_message
+
+ real_timestamp = "2020-10-08T14:16:17.793417674Z"
+ timestamp, line = parse_log_line(f"{real_timestamp} {log_message}")
+ assert timestamp == pendulum.parse(real_timestamp)
+ assert line == log_message
+
+
class TestPodManager:
def setup_method(self):
self.mock_kube_client = mock.Mock()
@@ -51,6 +66,23 @@ class TestPodManager:
kube_client=self.mock_kube_client,
callbacks=[MockKubernetesPodOperatorCallback],
)
+ # List of PodManager methods that may use tenacity retry
+ tenacity_methods = [
+ "await_pod_start",
+ "read_pod_logs",
+ "create_pod",
+ "get_init_container_names",
+ "get_container_names",
+ "read_pod_events",
+ "read_pod",
+ "extract_xcom_json",
+ "extract_xcom_kill",
+ ]
+ # Patch tenacity retry wait for all relevant methods to disable
waiting in tests
+ for method_name in tenacity_methods:
+ method = getattr(self.pod_manager, method_name, None)
+ if method and hasattr(method, "retry"):
+ method.retry.wait = wait_none()
def test_read_pod_logs_successfully_returns_logs(self):
mock.sentinel.metadata = mock.MagicMock()
@@ -298,17 +330,6 @@ class TestPodManager:
with pytest.raises(AirflowException):
self.pod_manager.read_pod(mock.sentinel)
- def test_parse_log_line(self):
- log_message = "This should return no timestamp"
- timestamp, line = self.pod_manager.parse_log_line(log_message)
- assert timestamp is None
- assert line == log_message
-
- real_timestamp = "2020-10-08T14:16:17.793417674Z"
- timestamp, line = self.pod_manager.parse_log_line(f"{real_timestamp}
{log_message}")
- assert timestamp == pendulum.parse(real_timestamp)
- assert line == log_message
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_fetch_container_logs_returning_last_timestamp(
@@ -706,12 +727,29 @@ class TestPodManager:
class TestAsyncPodManager:
+ @pytest.fixture
+ def mock_log_info(self):
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ yield mock_log_info
+
def setup_method(self):
self.mock_async_hook = mock.AsyncMock()
self.async_pod_manager = AsyncPodManager(
async_hook=self.mock_async_hook,
callbacks=[],
)
+ # List of PodManager methods that may use tenacity retry
+ tenacity_methods = [
+ "await_pod_start",
+ "fetch_container_logs_before_current_sec",
+ "read_pod_events",
+ "read_pod",
+ ]
+ # Patch tenacity retry wait for all relevant methods to disable
waiting in tests
+ for method_name in tenacity_methods:
+ method = getattr(self.async_pod_manager, method_name, None)
+ if method and hasattr(method, "retry"):
+ method.retry.wait = wait_none()
@pytest.mark.asyncio
async def
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -770,7 +808,7 @@ class TestAsyncPodManager:
@pytest.mark.asyncio
@mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
- async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
+ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep,
mock_log_info):
condition_scheduled = mock.MagicMock()
condition_scheduled.type = "PodScheduled"
condition_scheduled.status = "True"
@@ -797,23 +835,23 @@ class TestAsyncPodManager:
schedule_timeout = 30
startup_timeout = 60
mock_pod = mock.MagicMock()
- with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
- await self.async_pod_manager.await_pod_start(
- pod=mock_pod,
- schedule_timeout=schedule_timeout,
- startup_timeout=startup_timeout,
- check_interval=startup_check_interval,
- )
- assert mock_time_sleep.call_count == 3
- mock_log_info.assert_any_call(
- "::group::Waiting until %ss to get the POD scheduled...",
schedule_timeout
- )
- mock_log_info.assert_any_call("Waiting %ss to get the POD
running...", startup_timeout)
- assert self.async_pod_manager.stop_watching_events is True
+
+ await self.async_pod_manager.await_pod_start(
+ pod=mock_pod,
+ schedule_timeout=schedule_timeout,
+ startup_timeout=startup_timeout,
+ check_interval=startup_check_interval,
+ )
+ assert mock_time_sleep.call_count == 3
+ mock_log_info.assert_any_call(
+ "::group::Waiting until %ss to get the POD scheduled...",
schedule_timeout
+ )
+ mock_log_info.assert_any_call("Waiting %ss to get the POD running...",
startup_timeout)
+ assert self.async_pod_manager.stop_watching_events is True
@pytest.mark.asyncio
@mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
- async def test_watch_pod_events(self, mock_time_sleep):
+ async def test_watch_pod_events(self, mock_time_sleep, mock_log_info):
mock_pod = mock.MagicMock()
mock_pod.metadata.name = "test-pod"
mock_pod.metadata.namespace = "default"
@@ -833,15 +871,98 @@ class TestAsyncPodManager:
self.mock_async_hook.get_pod_events.side_effect =
get_pod_events_side_effect
- with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
- await self.async_pod_manager.watch_pod_events(pod=mock_pod,
check_interval=startup_check_interval)
- mock_log_info.assert_any_call(
- "The Pod has an Event: %s from %s", "test event 1", "object
event 1"
+ await self.async_pod_manager.watch_pod_events(pod=mock_pod,
check_interval=startup_check_interval)
+ mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"test event 1", "object event 1")
+ mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"test event 2", "object event 2")
+ mock_time_sleep.assert_called_once_with(startup_check_interval)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "log_lines, now, expected_log_messages, not_expected_log_messages",
+ [
+ # Case 1: No logs
+ ([], pendulum.now(), [], []),
+ # Case 2: One log line with timestamp before now
+ (
+ [f"{pendulum.now().subtract(seconds=2).to_iso8601_string()}
message"],
+ pendulum.now(),
+ ["message"],
+ [],
+ ),
+ # Case 3: Log line with timestamp equal to now (should be skipped,
so last_time is None)
+ ([f"{pendulum.now().to_iso8601_string()} message"],
pendulum.now(), [], ["message"]),
+ # Case 4: Multiple log lines, last before now
+ (
+ [
+ f"{pendulum.now().subtract(seconds=3).to_iso8601_string()}
msg1",
+ f"{pendulum.now().subtract(seconds=2).to_iso8601_string()}
msg2",
+ ],
+ pendulum.now(),
+ ["msg1", "msg2"],
+ [],
+ ),
+ # Case 5: Log lines with continuation (no timestamp)
+ (
+ [
+ f"{pendulum.now().subtract(seconds=2).to_iso8601_string()}
msg1",
+ "continued line",
+ ],
+ pendulum.now(),
+ ["msg1\ncontinued line"],
+ [],
+ ),
+ # Case 6: Log lines with continuation (no timestamp)
+ (
+ [
+ f"{pendulum.now().subtract(seconds=2).to_iso8601_string()}
msg1",
+ f"{pendulum.now().to_iso8601_string()} msg2",
+ ],
+ pendulum.now(),
+ ["msg1"],
+ ["msg2"],
+ ),
+ ],
+ )
+ async def test_fetch_container_logs_before_current_sec_various_logs(
+ self, log_lines, now, expected_log_messages, not_expected_log_messages
+ ):
+ pod = mock.MagicMock()
+ container_name = "base"
+ since_time = now.subtract(minutes=1)
+ mock_async_hook = mock.AsyncMock()
+ mock_async_hook.read_logs.return_value = log_lines
+
+ with
mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.pendulum.now",
return_value=now):
+ async_pod_manager = AsyncPodManager(
+ async_hook=mock_async_hook,
+ callbacks=[],
)
- mock_log_info.assert_any_call(
- "The Pod has an Event: %s from %s", "test event 2", "object
event 2"
+ with mock.patch.object(async_pod_manager.log, "info") as
mock_log_info:
+ result = await
async_pod_manager.fetch_container_logs_before_current_sec(
+ pod=pod, container_name=container_name,
since_time=since_time
+ )
+ assert result == now
+
+ for expected in expected_log_messages:
+ mock_log_info.assert_any_call("[%s] %s", container_name,
expected)
+ for not_expected in not_expected_log_messages:
+ unexpected_call = mock.call("[%s] %s", container_name,
not_expected)
+ assert unexpected_call not in mock_log_info.mock_calls
+
+ @pytest.mark.asyncio
+ async def
test_fetch_container_logs_before_current_sec_error_handling(self):
+ pod = mock.MagicMock()
+ container_name = "base"
+ since_time = pendulum.now().subtract(minutes=1)
+
+ async def fake_read_logs(**kwargs):
+ raise KubernetesApiError("error")
+
+ self.async_pod_manager._hook.read_logs = fake_read_logs
+ with pytest.raises(KubernetesApiError):
+ await
self.async_pod_manager.fetch_container_logs_before_current_sec(
+ pod=pod, container_name=container_name, since_time=since_time
)
- mock_time_sleep.assert_called_once_with(startup_check_interval)
class TestPodLogsConsumer:
diff --git
a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
index 3bca9ce990d..e5a31b2327f 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
@@ -531,7 +531,7 @@ class TestGKEKubernetesAsyncHook:
caplog.set_level(logging.INFO)
self.make_mock_awaitable(read_namespaced_pod_log, result="Test string
#1\nTest string #2\n")
- await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE)
+ logs = await async_hook.read_logs(name=POD_NAME,
namespace=POD_NAMESPACE)
get_conn_mock.assert_called_once_with()
read_namespaced_pod_log.assert_called_with(
@@ -539,9 +539,11 @@ class TestGKEKubernetesAsyncHook:
namespace=POD_NAMESPACE,
follow=False,
timestamps=True,
+ container_name=None,
+ since_seconds=None,
)
- assert "Test string #1" in caplog.text
- assert "Test string #2" in caplog.text
+ assert "Test string #1" in logs
+ assert "Test string #2" in logs
@pytest_asyncio.fixture