This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6130993d78 Fix KubernetesPodOperator duplicating logs when interrupted 
(#33500)
6130993d78 is described below

commit 6130993d781695bbd87e09d3665d8f0991bc32d0
Author: Freddy Demiane <[email protected]>
AuthorDate: Thu Aug 24 11:05:15 2023 +0200

    Fix KubernetesPodOperator duplicating logs when interrupted (#33500)
---
 airflow/providers/cncf/kubernetes/utils/pod_manager.py    |  8 +++++---
 tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 14 ++++++++++++++
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 08a5d904ca..49c6518696 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -398,7 +398,7 @@ class PodManager(LoggingMixin):
 
             Returns the last timestamp observed in logs.
             """
-            timestamp = None
+            last_captured_timestamp = None
             try:
                 logs = self.read_pod_logs(
                     pod=pod,
@@ -412,7 +412,9 @@ class PodManager(LoggingMixin):
                 )
                 for raw_line in logs:
                     line = raw_line.decode("utf-8", errors="backslashreplace")
-                    timestamp, message = self.parse_log_line(line)
+                    line_timestamp, message = self.parse_log_line(line)
+                    if line_timestamp is not None:
+                        last_captured_timestamp = line_timestamp
                     self.log.info("[%s] %s", container_name, message)
             except BaseHTTPError as e:
                 self.log.warning(
@@ -426,7 +428,7 @@ class PodManager(LoggingMixin):
                     pod.metadata.name,
                     exc_info=True,
                 )
-            return timestamp or since_time
+            return last_captured_timestamp or since_time
 
         # note: `read_pod_logs` follows the logs, so we shouldn't necessarily 
*need* to
         # loop as we do here. But in a long-running process we might 
temporarily lose connectivity.
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index c6c42105b3..4868f9ca22 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -20,6 +20,7 @@ import logging
 from datetime import datetime
 from json.decoder import JSONDecodeError
 from types import SimpleNamespace
+from typing import cast
 from unittest import mock
 from unittest.mock import MagicMock
 
@@ -254,6 +255,19 @@ class TestPodManager:
         assert timestamp == pendulum.parse(real_timestamp)
         assert line == log_message
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+    def test_fetch_container_logs_returning_last_timestamp(
+        self, mock_read_pod_logs, mock_container_is_running
+    ):
+        timestamp_string = "2020-10-08T14:16:17.793417674Z"
+        mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} 
message", "utf-8"), b"notimestamp"]
+        mock_container_is_running.side_effect = [True, False]
+
+        status = self.pod_manager.fetch_container_logs(mock.MagicMock(), 
mock.MagicMock(), follow=True)
+
+        assert status.last_log_time == cast(DateTime, 
pendulum.parse(timestamp_string))
+
     def test_parse_invalid_log_line(self, caplog):
         with caplog.at_level(logging.INFO):
             
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")

Reply via email to