This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4234d8db7e Fix parsing KubernetesPodOperator multiline logs (#34412)
4234d8db7e is described below
commit 4234d8db7e4a51683f8236270c87375cf80ba3f4
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Oct 4 01:42:51 2023 +0200
Fix parsing KubernetesPodOperator multiline logs (#34412)
* Fix parsing KubernetesPodOperator multiline logs
* make it b/c with progress_callback
* add a unit test
* fix integration test
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 64 ++++++++++++++--------
kubernetes_tests/test_kubernetes_pod_operator.py | 2 +-
.../cncf/kubernetes/utils/test_pod_manager.py | 27 +++++++--
3 files changed, 64 insertions(+), 29 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 05829a1615..65a264eeee 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -408,25 +408,47 @@ class PodManager(LoggingMixin):
"""
last_captured_timestamp = None
try:
- if not logs:
- logs = self.read_pod_logs(
- pod=pod,
- container_name=container_name,
- timestamps=True,
- since_seconds=(
- math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
- ),
- follow=follow,
- post_termination_timeout=post_termination_timeout,
- )
- for raw_line in logs:
- line = raw_line.decode("utf-8", errors="backslashreplace")
- line_timestamp, message = self.parse_log_line(line)
+ logs = self.read_pod_logs(
+ pod=pod,
+ container_name=container_name,
+ timestamps=True,
+ since_seconds=(
+ math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
+ ),
+ follow=follow,
+ post_termination_timeout=post_termination_timeout,
+ )
+ message_to_log = None
+ message_timestamp = None
+ progress_callback_lines = []
+ try:
+ for raw_line in logs:
+ line = raw_line.decode("utf-8",
errors="backslashreplace")
+ line_timestamp, message = self.parse_log_line(line)
+ if line_timestamp: # detect new log line
+ if message_to_log is None: # first line in the log
+ message_to_log = message
+ message_timestamp = line_timestamp
+ progress_callback_lines.append(line)
+ else: # previous log line is complete
+ if self._progress_callback:
+ for line in progress_callback_lines:
+ self._progress_callback(line)
+ self.log.info("[%s] %s", container_name,
message_to_log)
+ last_captured_timestamp = message_timestamp
+ message_to_log = message
+ message_timestamp = line_timestamp
+ progress_callback_lines = [line]
+ else: # continuation of the previous log line
+ message_to_log = f"{message_to_log}\n{message}"
+ progress_callback_lines.append(line)
+ finally:
+ # log the last line and update the last_captured_timestamp
if self._progress_callback:
- self._progress_callback(line)
- if line_timestamp is not None:
- last_captured_timestamp = line_timestamp
- self.log.info("[%s] %s", container_name, message)
+ for line in progress_callback_lines:
+ self._progress_callback(line)
+ self.log.info("[%s] %s", container_name, message_to_log)
+ last_captured_timestamp = message_timestamp
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted for container %r with error
%r; will retry. "
@@ -570,16 +592,10 @@ class PodManager(LoggingMixin):
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
- self.log.error(
- "Error parsing timestamp (no timestamp in message %r). "
- "Will continue execution but won't update timestamp",
- line,
- )
return None, line
try:
last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
- self.log.error("Error parsing timestamp. Will continue execution
but won't update timestamp")
return None, line
return last_log_time, message
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 2a3803d6c1..6b56bc9cd2 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -518,7 +518,7 @@ class TestKubernetesPodOperatorSystem:
)
context = create_context(k)
k.execute(context=context)
- mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved
from mount")
+ mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved
from mount\n")
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod["spec"]["containers"][0]["args"] = args
self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 42e2a5c610..d0e5408801 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -309,11 +309,30 @@ class TestPodManager:
assert status.last_log_time == cast(DateTime,
pendulum.parse(last_timestamp_string))
assert self.mock_progress_callback.call_count == expected_call_count
- def test_parse_invalid_log_line(self, caplog):
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+ def test_parse_multi_line_logs(self, mock_read_pod_logs,
mock_container_is_running, caplog):
+ log = (
+ "2020-10-08T14:16:17.793417674Z message1 line1\n"
+ "message1 line2\n"
+ "message1 line3\n"
+ "2020-10-08T14:16:18.793417674Z message2 line1\n"
+ "message2 line2\n"
+ "2020-10-08T14:16:19.793417674Z message3 line1\n"
+ )
+ mock_read_pod_logs.return_value = [bytes(log_line, "utf-8") for
log_line in log.split("\n")]
+ mock_container_is_running.return_value = False
+
with caplog.at_level(logging.INFO):
-
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")
- assert "Invalidmessage" in caplog.text
- assert "no timestamp in message" in caplog.text
+ self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
+
+ assert "message1 line1" in caplog.text
+ assert "message1 line2" in caplog.text
+ assert "message1 line3" in caplog.text
+ assert "message2 line1" in caplog.text
+ assert "message2 line2" in caplog.text
+ assert "message3 line1" in caplog.text
+ assert "ERROR" not in caplog.text
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
def test_start_pod_retries_on_409_error(self, mock_run_pod_async):