This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a630926a187 KubernetesPodTriggerer reads pod logs instead of 
KubernetesPodOperator (#57531)
a630926a187 is described below

commit a630926a18730f3adcfccb619a5904509c17d6bc
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu Nov 6 22:11:05 2025 +0100

    KubernetesPodTriggerer reads pod logs instead of KubernetesPodOperator 
(#57531)
    
    * Move container-related functions from PodManager to a separate file
    
    * Moved unit tests
    
    * Sync and async workflow use the same code to track Pod startup
    
    * Reworked unit tests and pod startup logic
    
    * Add api permission error detection for triggerer
    
    * Fix pytest fixture
    
    * Removed not requried code
    
    * Move log file reading to triggerer
    
    * Removed not required code
    
    * Removed not requried unit test
    
    * Fixed unit tests
    
    * Clean up unit tests
    
    * Adapt unit test
    
    * Adapt return type
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  15 +-
 .../providers/cncf/kubernetes/operators/pod.py     |  33 ----
 .../providers/cncf/kubernetes/triggers/pod.py      |  18 +-
 .../providers/cncf/kubernetes/utils/pod_manager.py |  86 ++++++++--
 .../unit/cncf/kubernetes/hooks/test_kubernetes.py  |  36 ++--
 .../unit/cncf/kubernetes/operators/test_pod.py     |  49 ------
 .../unit/cncf/kubernetes/triggers/test_pod.py      |  38 +++--
 .../unit/cncf/kubernetes/utils/test_pod_manager.py | 187 +++++++++++++++++----
 .../google/cloud/hooks/test_kubernetes_engine.py   |   8 +-
 9 files changed, 290 insertions(+), 180 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d59c6527d6b..49168cc6dfc 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -910,7 +910,9 @@ class AsyncKubernetesHook(KubernetesHook):
                 if str(e.status) != "404":
                     raise
 
-    async def read_logs(self, name: str, namespace: str):
+    async def read_logs(
+        self, name: str, namespace: str, container_name: str | None = None, 
since_seconds: int | None = None
+    ) -> list[str]:
         """
         Read logs inside the pod while starting containers inside.
 
@@ -921,6 +923,8 @@ class AsyncKubernetesHook(KubernetesHook):
 
         :param name: Name of the pod.
         :param namespace: Name of the pod's namespace.
+        :param container_name: Name of the container inside the pod.
+        :param since_seconds: Only return logs newer than a relative duration 
in seconds.
         """
         async with self.get_conn() as connection:
             try:
@@ -928,16 +932,15 @@ class AsyncKubernetesHook(KubernetesHook):
                 logs = await v1_api.read_namespaced_pod_log(
                     name=name,
                     namespace=namespace,
+                    container_name=container_name,
                     follow=False,
                     timestamps=True,
+                    since_seconds=since_seconds,
                 )
                 logs = logs.splitlines()
-                for line in logs:
-                    self.log.info("Container logs from %s", line)
                 return logs
-            except HTTPError:
-                self.log.exception("There was an error reading the kubernetes 
API.")
-                raise
+            except HTTPError as e:
+                raise KubernetesApiError from e
 
     async def get_pod_events(self, name: str, namespace: str) -> 
CoreV1EventList:
         """Get pod's events."""
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 895ee8ddab0..4249f56337d 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -899,17 +899,6 @@ class KubernetesPodOperator(BaseOperator):
             if not self.pod:
                 raise PodNotFoundException("Could not find pod after resuming 
from deferral")
 
-            if event["status"] != "running":
-                for callback in self.callbacks:
-                    callback.on_operator_resuming(
-                        pod=self.pod,
-                        event=event,
-                        client=self.client,
-                        mode=ExecutionMode.SYNC,
-                        context=context,
-                        operator=self,
-                    )
-
             follow = self.logging_interval is None
             last_log_time = event.get("last_log_time")
 
@@ -942,34 +931,12 @@ class KubernetesPodOperator(BaseOperator):
                     )
                     message = event.get("stack_trace", event["message"])
                     raise AirflowException(message)
-
-                return xcom_sidecar_output
-
-            if event["status"] == "running":
-                if self.get_logs:
-                    self.log.info("Resuming logs read from time %r", 
last_log_time)
-
-                    pod_log_status = self.pod_manager.fetch_container_logs(
-                        pod=self.pod,
-                        container_name=self.base_container_name,
-                        follow=follow,
-                        since_time=last_log_time,
-                        
container_name_log_prefix_enabled=self.container_name_log_prefix_enabled,
-                        log_formatter=self.log_formatter,
-                    )
-
-                    self.invoke_defer_method(pod_log_status.last_log_time)
-                else:
-                    self.invoke_defer_method()
         except TaskDeferred:
             raise
         finally:
             self._clean(event=event, context=context, 
result=xcom_sidecar_output)
 
     def _clean(self, event: dict[str, Any], result: dict | None, context: 
Context) -> None:
-        if event["status"] == "running":
-            return
-
         if self.pod is None:
             return
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 78e760df018..29c1a382a0b 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -287,16 +287,14 @@ class KubernetesPodTrigger(BaseTrigger):
                     }
                 )
             self.log.debug("Container is not completed and still working.")
-            if time_get_more_logs and 
datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
-                return TriggerEvent(
-                    {
-                        "status": "running",
-                        "last_log_time": self.last_log_time,
-                        "namespace": self.pod_namespace,
-                        "name": self.pod_name,
-                        **self.trigger_kwargs,
-                    }
-                )
+            now = datetime.datetime.now(tz=datetime.timezone.utc)
+            if time_get_more_logs and now >= time_get_more_logs:
+                if self.get_logs and self.logging_interval:
+                    self.last_log_time = await 
self.pod_manager.fetch_container_logs_before_current_sec(
+                        pod, container_name=self.base_container_name, 
since_time=self.last_log_time
+                    )
+                    time_get_more_logs = now + 
datetime.timedelta(seconds=self.logging_interval)
+
             self.log.debug("Sleeping for %s seconds.", self.poll_interval)
             await asyncio.sleep(self.poll_interval)
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 789ad4db79e..a8cdcf9c634 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -480,7 +480,7 @@ class PodManager(LoggingMixin):
                 try:
                     for raw_line in logs:
                         line = raw_line.decode("utf-8", 
errors="backslashreplace")
-                        line_timestamp, message = self.parse_log_line(line)
+                        line_timestamp, message = parse_log_line(line)
                         if line_timestamp:  # detect new log line
                             if message_to_log is None:  # first line in the log
                                 message_to_log = message
@@ -708,22 +708,6 @@ class PodManager(LoggingMixin):
             time.sleep(2)
         return remote_pod
 
-    def parse_log_line(self, line: str) -> tuple[DateTime | None, str]:
-        """
-        Parse K8s log line and returns the final state.
-
-        :param line: k8s log line
-        :return: timestamp and log message
-        """
-        timestamp, sep, message = line.strip().partition(" ")
-        if not sep:
-            return None, line
-        try:
-            last_log_time = cast("DateTime", pendulum.parse(timestamp))
-        except ParserError:
-            return None, line
-        return last_log_time, message
-
     def container_is_running(self, pod: V1Pod, container_name: str) -> bool:
         """Read pod and checks if container is running."""
         remote_pod = self.read_pod(pod)
@@ -971,6 +955,23 @@ def is_log_group_marker(line: str) -> bool:
     return line.startswith("::group::") or line.startswith("::endgroup::")
 
 
+def parse_log_line(line: str) -> tuple[DateTime | None, str]:
+    """
+    Parse K8s log line and returns the final state.
+
+    :param line: k8s log line
+    :return: timestamp and log message
+    """
+    timestamp, sep, message = line.strip().partition(" ")
+    if not sep:
+        return None, line
+    try:
+        last_log_time = cast("DateTime", pendulum.parse(timestamp))
+    except ParserError:
+        return None, line
+    return last_log_time, message
+
+
 class AsyncPodManager(LoggingMixin):
     """Create, monitor, and otherwise interact with Kubernetes pods for use 
with the KubernetesPodTriggerer."""
 
@@ -1032,3 +1033,54 @@ class AsyncPodManager(LoggingMixin):
             startup_timeout=startup_timeout,
             check_interval=check_interval,
         )
+
+    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    async def fetch_container_logs_before_current_sec(
+        self, pod: V1Pod, container_name: str, since_time: DateTime | None = 
None
+    ) -> DateTime | None:
+        """
+        Asynchronously read the log file of the specified pod.
+
+        This method streams logs from the base container, skipping log lines 
from the current second to prevent duplicate entries on subsequent reads. It is 
designed to handle long-running containers and gracefully suppresses transient 
interruptions.
+
+        :param pod: The pod specification to monitor.
+        :param container_name: The name of the container within the pod.
+        :param since_time: The timestamp from which to start reading logs.
+        :return: The timestamp to use for the next log read, representing the 
start of the current second. Returns None if an exception occurred.
+        """
+        now = pendulum.now()
+        logs = await self._hook.read_logs(
+            name=pod.metadata.name,
+            namespace=pod.metadata.namespace,
+            container_name=container_name,
+            since_seconds=(math.ceil((now - since_time).total_seconds()) if 
since_time else None),
+        )
+        message_to_log = None
+        try:
+            now_seconds = now.replace(microsecond=0)
+            for line in logs:
+                line_timestamp, message = parse_log_line(line)
+                # Skip log lines from the current second to prevent duplicate 
entries on the next read.
+                # The API only allows specifying 'since_seconds', not an exact 
timestamp.
+                if line_timestamp and line_timestamp.replace(microsecond=0) == 
now_seconds:
+                    break
+                if line_timestamp:  # detect new log line
+                    if message_to_log is None:  # first line in the log
+                        message_to_log = message
+                    else:  # previous log line is complete
+                        if message_to_log is not None:
+                            if is_log_group_marker(message_to_log):
+                                print(message_to_log)
+                            else:
+                                self.log.info("[%s] %s", container_name, 
message_to_log)
+                        message_to_log = message
+                elif message_to_log:  # continuation of the previous log line
+                    message_to_log = f"{message_to_log}\n{message}"
+        finally:
+            # log the last line and update the last_captured_timestamp
+            if message_to_log is not None:
+                if is_log_group_marker(message_to_log):
+                    print(message_to_log)
+                else:
+                    self.log.info("[%s] %s", container_name, message_to_log)
+            return now  # Return the current time as the last log time to 
ensure logs from the current second are read in the next fetch.
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
index 223c153cfe0..f6e51149ed3 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
@@ -1060,25 +1060,25 @@ class TestAsyncKubernetesHook:
             config_file=None,
             cluster_context=None,
         )
-        with mock.patch(
-            
"airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.log",
-            new_callable=PropertyMock,
-        ) as log:
-            await hook.read_logs(
-                name=POD_NAME,
-                namespace=NAMESPACE,
-            )
 
-            lib_method.assert_called_once()
-            lib_method.assert_called_with(
-                name=POD_NAME,
-                namespace=NAMESPACE,
-                follow=False,
-                timestamps=True,
-            )
-            log.return_value.info.assert_called_with(
-                "Container logs from %s", "2023-01-11 Some string logs..."
-            )
+        logs = await hook.read_logs(
+            name=POD_NAME,
+            namespace=NAMESPACE,
+            container_name=CONTAINER_NAME,
+            since_seconds=10,
+        )
+
+        lib_method.assert_called_once()
+        lib_method.assert_called_with(
+            name=POD_NAME,
+            namespace=NAMESPACE,
+            container_name=CONTAINER_NAME,
+            follow=False,
+            timestamps=True,
+            since_seconds=10,
+        )
+        assert len(logs) == 1
+        assert "2023-01-11 Some string logs..." in logs
 
     @pytest.mark.asyncio
     @mock.patch(KUBE_BATCH_API.format("read_namespaced_job_status"))
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 3a64fe1f9b1..aef2ef29da3 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2166,18 +2166,6 @@ class TestKubernetesPodOperator:
         process_pod_deletion_mock.assert_called_once_with(pod_1)
         assert result.metadata.name == pod_2.metadata.name
 
-    @patch(POD_MANAGER_CLASS.format("fetch_container_logs"))
-    @patch(KUB_OP_PATH.format("invoke_defer_method"))
-    def test_defere_call_one_more_time_after_error(self, invoke_defer_method, 
fetch_container_logs):
-        fetch_container_logs.return_value = PodLoggingStatus(False, None)
-        op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
-
-        op.trigger_reentry(
-            create_context(op), event={"name": TEST_NAME, "namespace": 
TEST_NAMESPACE, "status": "running"}
-        )
-
-        invoke_defer_method.assert_called_with(None)
-
 
 class TestSuppress:
     def test__suppress(self, caplog):
@@ -2611,32 +2599,6 @@ class TestKubernetesPodOperatorAsync:
         with pytest.raises(AirflowException, match=expect_match):
             k.cleanup(pod, pod)
 
-    @patch(
-        
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.convert_config_file_to_dict"
-    )
-    @patch(f"{HOOK_CLASS}.get_pod")
-    
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
-    
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
-    def test_get_logs_running(
-        self,
-        fetch_container_logs,
-        await_pod_completion,
-        get_pod,
-        mock_convert_config_file_to_dict,
-    ):
-        """When logs fetch exits with status running, raise task deferred"""
-        pod = MagicMock()
-        get_pod.return_value = pod
-        op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
-        await_pod_completion.return_value = None
-        fetch_container_logs.return_value = PodLoggingStatus(True, None)
-        with pytest.raises(TaskDeferred):
-            op.trigger_reentry(
-                create_context(op),
-                event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, 
"status": "running"},
-            )
-        fetch_container_logs.is_called_with(pod, "base")
-
     @patch(KUB_OP_PATH.format("_write_logs"))
     
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
     
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
@@ -2707,17 +2669,6 @@ class TestKubernetesPodOperatorAsync:
         }
         k.trigger_reentry(context=context, event=callback_event)
 
-        # check on_operator_resuming callback
-        mock_callbacks.on_operator_resuming.assert_called_once()
-        assert mock_callbacks.on_operator_resuming.call_args.kwargs == {
-            "client": k.client,
-            "mode": ExecutionMode.SYNC,
-            "pod": remote_pod_mock,
-            "operator": k,
-            "context": context,
-            "event": callback_event,
-        }
-
         # check on_pod_cleanup callback
         mock_callbacks.on_pod_cleanup.assert_called_once()
         assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index ce57d0f5e96..1d4eead8858 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -259,7 +259,7 @@ class TestKubernetesPodTrigger:
             pytest.param(
                 0,
                 {
-                    "status": "running",
+                    "status": "success",
                     "last_log_time": DateTime(2022, 1, 1),
                     "name": POD_NAME,
                     "namespace": NAMESPACE,
@@ -268,26 +268,41 @@ class TestKubernetesPodTrigger:
             ),
         ],
     )
+    @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.datetime")
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.triggers.pod.AsyncPodManager.fetch_container_logs_before_current_sec"
+    )
     
@mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod")
     async def test_running_log_interval(
-        self, mock_get_pod, mock_wait_pod, define_container_state, 
logging_interval, exp_event
+        self,
+        mock_get_pod,
+        mock_fetch_container_logs_before_current_sec,
+        mock_wait_pod,
+        define_container_state,
+        mock_datetime,
+        logging_interval,
+        exp_event,
     ):
         """
-        If log interval given, should emit event with running status and last 
log time.
-        Otherwise, should make it to second loop and emit "done" event.
-        For this test we emit container status "running, running not".
-        The first "running" status gets us out of wait_for_pod_start.
-        The second "running" will fire a "running" event when logging interval 
is non-None.  When logging
-        interval is None, the second "running" status will just result in 
continuation of the loop.  And
-        when in the next loop we get a non-running status, the trigger fires a 
"done" event.
+        If log interval given, check that the trigger fetches logs at the 
right times.
         """
-        define_container_state.return_value = "running"
+        fixed_now = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc)
+        mock_datetime.datetime.now.side_effect = [
+            fixed_now,
+            fixed_now + datetime.timedelta(seconds=1),
+            fixed_now + datetime.timedelta(seconds=2),
+        ]
+
+        mock_datetime.timedelta = datetime.timedelta
+        mock_datetime.timezone = datetime.timezone
+        mock_fetch_container_logs_before_current_sec.return_value = 
DateTime(2022, 1, 1)
+        define_container_state.side_effect = ["running", "running", 
"terminated"]
         trigger = KubernetesPodTrigger(
             pod_name=POD_NAME,
             pod_namespace=NAMESPACE,
-            trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc),
+            trigger_start_time=fixed_now,
             base_container_name=BASE_CONTAINER_NAME,
             startup_timeout=5,
             poll_interval=1,
@@ -295,6 +310,7 @@ class TestKubernetesPodTrigger:
             last_log_time=DateTime(2022, 1, 1),
         )
         assert await trigger.run().__anext__() == TriggerEvent(exp_event)
+        assert mock_fetch_container_logs_before_current_sec.call_count == 2
 
     @pytest.mark.parametrize(
         "container_state, expected_state",
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 485bdbd769e..2c1150fd9fe 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -27,14 +27,17 @@ import pendulum
 import pytest
 import time_machine
 from kubernetes.client.rest import ApiException
+from tenacity import wait_none
 from urllib3.exceptions import HTTPError as BaseHTTPError
 
 from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
     AsyncPodManager,
     PodLogsConsumer,
     PodManager,
     PodPhase,
+    parse_log_line,
 )
 from airflow.utils.timezone import utc
 
@@ -44,6 +47,18 @@ if TYPE_CHECKING:
     from pendulum import DateTime
 
 
+def test_parse_log_line():
+    log_message = "This should return no timestamp"
+    timestamp, line = parse_log_line(log_message)
+    assert timestamp is None
+    assert line == log_message
+
+    real_timestamp = "2020-10-08T14:16:17.793417674Z"
+    timestamp, line = parse_log_line(f"{real_timestamp} {log_message}")
+    assert timestamp == pendulum.parse(real_timestamp)
+    assert line == log_message
+
+
 class TestPodManager:
     def setup_method(self):
         self.mock_kube_client = mock.Mock()
@@ -51,6 +66,23 @@ class TestPodManager:
             kube_client=self.mock_kube_client,
             callbacks=[MockKubernetesPodOperatorCallback],
         )
+        # List of PodManager methods that may use tenacity retry
+        tenacity_methods = [
+            "await_pod_start",
+            "read_pod_logs",
+            "create_pod",
+            "get_init_container_names",
+            "get_container_names",
+            "read_pod_events",
+            "read_pod",
+            "extract_xcom_json",
+            "extract_xcom_kill",
+        ]
+        # Patch tenacity retry wait for all relevant methods to disable 
waiting in tests
+        for method_name in tenacity_methods:
+            method = getattr(self.pod_manager, method_name, None)
+            if method and hasattr(method, "retry"):
+                method.retry.wait = wait_none()
 
     def test_read_pod_logs_successfully_returns_logs(self):
         mock.sentinel.metadata = mock.MagicMock()
@@ -298,17 +330,6 @@ class TestPodManager:
         with pytest.raises(AirflowException):
             self.pod_manager.read_pod(mock.sentinel)
 
-    def test_parse_log_line(self):
-        log_message = "This should return no timestamp"
-        timestamp, line = self.pod_manager.parse_log_line(log_message)
-        assert timestamp is None
-        assert line == log_message
-
-        real_timestamp = "2020-10-08T14:16:17.793417674Z"
-        timestamp, line = self.pod_manager.parse_log_line(f"{real_timestamp} 
{log_message}")
-        assert timestamp == pendulum.parse(real_timestamp)
-        assert line == log_message
-
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
     def test_fetch_container_logs_returning_last_timestamp(
@@ -706,12 +727,29 @@ class TestPodManager:
 
 
 class TestAsyncPodManager:
+    @pytest.fixture
+    def mock_log_info(self):
+        with mock.patch.object(self.async_pod_manager.log, "info") as 
mock_log_info:
+            yield mock_log_info
+
     def setup_method(self):
         self.mock_async_hook = mock.AsyncMock()
         self.async_pod_manager = AsyncPodManager(
             async_hook=self.mock_async_hook,
             callbacks=[],
         )
+        # List of PodManager methods that may use tenacity retry
+        tenacity_methods = [
+            "await_pod_start",
+            "fetch_container_logs_before_current_sec",
+            "read_pod_events",
+            "read_pod",
+        ]
+        # Patch tenacity retry wait for all relevant methods to disable 
waiting in tests
+        for method_name in tenacity_methods:
+            method = getattr(self.async_pod_manager, method_name, None)
+            if method and hasattr(method, "retry"):
+                method.retry.wait = wait_none()
 
     @pytest.mark.asyncio
     async def 
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -770,7 +808,7 @@ class TestAsyncPodManager:
 
     @pytest.mark.asyncio
     @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
-    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
+    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, 
mock_log_info):
         condition_scheduled = mock.MagicMock()
         condition_scheduled.type = "PodScheduled"
         condition_scheduled.status = "True"
@@ -797,23 +835,23 @@ class TestAsyncPodManager:
         schedule_timeout = 30
         startup_timeout = 60
         mock_pod = mock.MagicMock()
-        with mock.patch.object(self.async_pod_manager.log, "info") as 
mock_log_info:
-            await self.async_pod_manager.await_pod_start(
-                pod=mock_pod,
-                schedule_timeout=schedule_timeout,
-                startup_timeout=startup_timeout,
-                check_interval=startup_check_interval,
-            )
-            assert mock_time_sleep.call_count == 3
-            mock_log_info.assert_any_call(
-                "::group::Waiting until %ss to get the POD scheduled...", 
schedule_timeout
-            )
-            mock_log_info.assert_any_call("Waiting %ss to get the POD 
running...", startup_timeout)
-            assert self.async_pod_manager.stop_watching_events is True
+
+        await self.async_pod_manager.await_pod_start(
+            pod=mock_pod,
+            schedule_timeout=schedule_timeout,
+            startup_timeout=startup_timeout,
+            check_interval=startup_check_interval,
+        )
+        assert mock_time_sleep.call_count == 3
+        mock_log_info.assert_any_call(
+            "::group::Waiting until %ss to get the POD scheduled...", 
schedule_timeout
+        )
+        mock_log_info.assert_any_call("Waiting %ss to get the POD running...", 
startup_timeout)
+        assert self.async_pod_manager.stop_watching_events is True
 
     @pytest.mark.asyncio
     @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
-    async def test_watch_pod_events(self, mock_time_sleep):
+    async def test_watch_pod_events(self, mock_time_sleep, mock_log_info):
         mock_pod = mock.MagicMock()
         mock_pod.metadata.name = "test-pod"
         mock_pod.metadata.namespace = "default"
@@ -833,15 +871,98 @@ class TestAsyncPodManager:
 
         self.mock_async_hook.get_pod_events.side_effect = 
get_pod_events_side_effect
 
-        with mock.patch.object(self.async_pod_manager.log, "info") as 
mock_log_info:
-            await self.async_pod_manager.watch_pod_events(pod=mock_pod, 
check_interval=startup_check_interval)
-            mock_log_info.assert_any_call(
-                "The Pod has an Event: %s from %s", "test event 1", "object 
event 1"
+        await self.async_pod_manager.watch_pod_events(pod=mock_pod, 
check_interval=startup_check_interval)
+        mock_log_info.assert_any_call("The Pod has an Event: %s from %s", 
"test event 1", "object event 1")
+        mock_log_info.assert_any_call("The Pod has an Event: %s from %s", 
"test event 2", "object event 2")
+        mock_time_sleep.assert_called_once_with(startup_check_interval)
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "log_lines, now, expected_log_messages, not_expected_log_messages",
+        [
+            # Case 1: No logs
+            ([], pendulum.now(), [], []),
+            # Case 2: One log line with timestamp before now
+            (
+                [f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} 
message"],
+                pendulum.now(),
+                ["message"],
+                [],
+            ),
+            # Case 3: Log line with timestamp equal to now (should be skipped, 
so last_time is None)
+            ([f"{pendulum.now().to_iso8601_string()} message"], 
pendulum.now(), [], ["message"]),
+            # Case 4: Multiple log lines, last before now
+            (
+                [
+                    f"{pendulum.now().subtract(seconds=3).to_iso8601_string()} 
msg1",
+                    f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} 
msg2",
+                ],
+                pendulum.now(),
+                ["msg1", "msg2"],
+                [],
+            ),
+            # Case 5: Log lines with continuation (no timestamp)
+            (
+                [
+                    f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} 
msg1",
+                    "continued line",
+                ],
+                pendulum.now(),
+                ["msg1\ncontinued line"],
+                [],
+            ),
+            # Case 6: Log lines with continuation (no timestamp)
+            (
+                [
+                    f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} 
msg1",
+                    f"{pendulum.now().to_iso8601_string()} msg2",
+                ],
+                pendulum.now(),
+                ["msg1"],
+                ["msg2"],
+            ),
+        ],
+    )
+    async def test_fetch_container_logs_before_current_sec_various_logs(
+        self, log_lines, now, expected_log_messages, not_expected_log_messages
+    ):
+        pod = mock.MagicMock()
+        container_name = "base"
+        since_time = now.subtract(minutes=1)
+        mock_async_hook = mock.AsyncMock()
+        mock_async_hook.read_logs.return_value = log_lines
+
+        with 
mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.pendulum.now", 
return_value=now):
+            async_pod_manager = AsyncPodManager(
+                async_hook=mock_async_hook,
+                callbacks=[],
             )
-            mock_log_info.assert_any_call(
-                "The Pod has an Event: %s from %s", "test event 2", "object 
event 2"
+            with mock.patch.object(async_pod_manager.log, "info") as 
mock_log_info:
+                result = await 
async_pod_manager.fetch_container_logs_before_current_sec(
+                    pod=pod, container_name=container_name, 
since_time=since_time
+                )
+                assert result == now
+
+                for expected in expected_log_messages:
+                    mock_log_info.assert_any_call("[%s] %s", container_name, 
expected)
+                for not_expected in not_expected_log_messages:
+                    unexpected_call = mock.call("[%s] %s", container_name, 
not_expected)
+                    assert unexpected_call not in mock_log_info.mock_calls
+
+    @pytest.mark.asyncio
+    async def 
test_fetch_container_logs_before_current_sec_error_handling(self):
+        pod = mock.MagicMock()
+        container_name = "base"
+        since_time = pendulum.now().subtract(minutes=1)
+
+        async def fake_read_logs(**kwargs):
+            raise KubernetesApiError("error")
+
+        self.async_pod_manager._hook.read_logs = fake_read_logs
+        with pytest.raises(KubernetesApiError):
+            await 
self.async_pod_manager.fetch_container_logs_before_current_sec(
+                pod=pod, container_name=container_name, since_time=since_time
             )
-            mock_time_sleep.assert_called_once_with(startup_check_interval)
 
 
 class TestPodLogsConsumer:
diff --git 
a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py 
b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
index 3bca9ce990d..e5a31b2327f 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py
@@ -531,7 +531,7 @@ class TestGKEKubernetesAsyncHook:
         caplog.set_level(logging.INFO)
         self.make_mock_awaitable(read_namespaced_pod_log, result="Test string 
#1\nTest string #2\n")
 
-        await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE)
+        logs = await async_hook.read_logs(name=POD_NAME, 
namespace=POD_NAMESPACE)
 
         get_conn_mock.assert_called_once_with()
         read_namespaced_pod_log.assert_called_with(
@@ -539,9 +539,11 @@ class TestGKEKubernetesAsyncHook:
             namespace=POD_NAMESPACE,
             follow=False,
             timestamps=True,
+            container_name=None,
+            since_seconds=None,
         )
-        assert "Test string #1" in caplog.text
-        assert "Test string #2" in caplog.text
+        assert "Test string #1" in logs
+        assert "Test string #2" in logs
 
 
 @pytest_asyncio.fixture


Reply via email to