This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6130993d78 Fix KubernetesPodOperator duplicating logs when interrupted
(#33500)
6130993d78 is described below
commit 6130993d781695bbd87e09d3665d8f0991bc32d0
Author: Freddy Demiane <[email protected]>
AuthorDate: Thu Aug 24 11:05:15 2023 +0200
Fix KubernetesPodOperator duplicating logs when interrupted (#33500)
---
airflow/providers/cncf/kubernetes/utils/pod_manager.py | 8 +++++---
tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 14 ++++++++++++++
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 08a5d904ca..49c6518696 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -398,7 +398,7 @@ class PodManager(LoggingMixin):
Returns the last timestamp observed in logs.
"""
- timestamp = None
+ last_captured_timestamp = None
try:
logs = self.read_pod_logs(
pod=pod,
@@ -412,7 +412,9 @@ class PodManager(LoggingMixin):
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
- timestamp, message = self.parse_log_line(line)
+ line_timestamp, message = self.parse_log_line(line)
+ if line_timestamp is not None:
+ last_captured_timestamp = line_timestamp
self.log.info("[%s] %s", container_name, message)
except BaseHTTPError as e:
self.log.warning(
@@ -426,7 +428,7 @@ class PodManager(LoggingMixin):
pod.metadata.name,
exc_info=True,
)
- return timestamp or since_time
+ return last_captured_timestamp or since_time
# note: `read_pod_logs` follows the logs, so we shouldn't necessarily
*need* to
# loop as we do here. But in a long-running process we might
temporarily lose connectivity.
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index c6c42105b3..4868f9ca22 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -20,6 +20,7 @@ import logging
from datetime import datetime
from json.decoder import JSONDecodeError
from types import SimpleNamespace
+from typing import cast
from unittest import mock
from unittest.mock import MagicMock
@@ -254,6 +255,19 @@ class TestPodManager:
assert timestamp == pendulum.parse(real_timestamp)
assert line == log_message
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+ def test_fetch_container_logs_returning_last_timestamp(
+ self, mock_read_pod_logs, mock_container_is_running
+ ):
+ timestamp_string = "2020-10-08T14:16:17.793417674Z"
+ mock_read_pod_logs.return_value = [bytes(f"{timestamp_string}
message", "utf-8"), b"notimestamp"]
+ mock_container_is_running.side_effect = [True, False]
+
+ status = self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
+
+ assert status.last_log_time == cast(DateTime,
pendulum.parse(timestamp_string))
+
def test_parse_invalid_log_line(self, caplog):
with caplog.at_level(logging.INFO):
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")