This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4234d8db7e Fix parsing KubernetesPodOperator multiline logs (#34412)
4234d8db7e is described below

commit 4234d8db7e4a51683f8236270c87375cf80ba3f4
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Oct 4 01:42:51 2023 +0200

    Fix parsing KubernetesPodOperator multiline logs (#34412)
    
    * Fix parsing KubernetesPodOperator multiline logs
    
    * make it b/c with progress_callback
    
    * add a unit test
    
    * fix integration test
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 64 ++++++++++++++--------
 kubernetes_tests/test_kubernetes_pod_operator.py   |  2 +-
 .../cncf/kubernetes/utils/test_pod_manager.py      | 27 +++++++--
 3 files changed, 64 insertions(+), 29 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 05829a1615..65a264eeee 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -408,25 +408,47 @@ class PodManager(LoggingMixin):
             """
             last_captured_timestamp = None
             try:
-                if not logs:
-                    logs = self.read_pod_logs(
-                        pod=pod,
-                        container_name=container_name,
-                        timestamps=True,
-                        since_seconds=(
-                            math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
-                        ),
-                        follow=follow,
-                        post_termination_timeout=post_termination_timeout,
-                    )
-                for raw_line in logs:
-                    line = raw_line.decode("utf-8", errors="backslashreplace")
-                    line_timestamp, message = self.parse_log_line(line)
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=(
+                        math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
+                    ),
+                    follow=follow,
+                    post_termination_timeout=post_termination_timeout,
+                )
+                message_to_log = None
+                message_timestamp = None
+                progress_callback_lines = []
+                try:
+                    for raw_line in logs:
+                        line = raw_line.decode("utf-8", 
errors="backslashreplace")
+                        line_timestamp, message = self.parse_log_line(line)
+                        if line_timestamp:  # detect new log line
+                            if message_to_log is None:  # first line in the log
+                                message_to_log = message
+                                message_timestamp = line_timestamp
+                                progress_callback_lines.append(line)
+                            else:  # previous log line is complete
+                                if self._progress_callback:
+                                    for line in progress_callback_lines:
+                                        self._progress_callback(line)
+                                self.log.info("[%s] %s", container_name, 
message_to_log)
+                                last_captured_timestamp = message_timestamp
+                                message_to_log = message
+                                message_timestamp = line_timestamp
+                                progress_callback_lines = [line]
+                        else:  # continuation of the previous log line
+                            message_to_log = f"{message_to_log}\n{message}"
+                            progress_callback_lines.append(line)
+                finally:
+                    # log the last line and update the last_captured_timestamp
                     if self._progress_callback:
-                        self._progress_callback(line)
-                    if line_timestamp is not None:
-                        last_captured_timestamp = line_timestamp
-                    self.log.info("[%s] %s", container_name, message)
+                        for line in progress_callback_lines:
+                            self._progress_callback(line)
+                    self.log.info("[%s] %s", container_name, message_to_log)
+                    last_captured_timestamp = message_timestamp
             except BaseHTTPError as e:
                 self.log.warning(
                     "Reading of logs interrupted for container %r with error 
%r; will retry. "
@@ -570,16 +592,10 @@ class PodManager(LoggingMixin):
         """
         timestamp, sep, message = line.strip().partition(" ")
         if not sep:
-            self.log.error(
-                "Error parsing timestamp (no timestamp in message %r). "
-                "Will continue execution but won't update timestamp",
-                line,
-            )
             return None, line
         try:
             last_log_time = cast(DateTime, pendulum.parse(timestamp))
         except ParserError:
-            self.log.error("Error parsing timestamp. Will continue execution 
but won't update timestamp")
             return None, line
         return last_log_time, message
 
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 2a3803d6c1..6b56bc9cd2 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -518,7 +518,7 @@ class TestKubernetesPodOperatorSystem:
             )
             context = create_context(k)
             k.execute(context=context)
-            mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved 
from mount")
+            mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved 
from mount\n")
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod["spec"]["containers"][0]["args"] = args
             self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 42e2a5c610..d0e5408801 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -309,11 +309,30 @@ class TestPodManager:
         assert status.last_log_time == cast(DateTime, 
pendulum.parse(last_timestamp_string))
         assert self.mock_progress_callback.call_count == expected_call_count
 
-    def test_parse_invalid_log_line(self, caplog):
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
+    def test_parse_multi_line_logs(self, mock_read_pod_logs, 
mock_container_is_running, caplog):
+        log = (
+            "2020-10-08T14:16:17.793417674Z message1 line1\n"
+            "message1 line2\n"
+            "message1 line3\n"
+            "2020-10-08T14:16:18.793417674Z message2 line1\n"
+            "message2 line2\n"
+            "2020-10-08T14:16:19.793417674Z message3 line1\n"
+        )
+        mock_read_pod_logs.return_value = [bytes(log_line, "utf-8") for 
log_line in log.split("\n")]
+        mock_container_is_running.return_value = False
+
         with caplog.at_level(logging.INFO):
-            
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")
-        assert "Invalidmessage" in caplog.text
-        assert "no timestamp in message" in caplog.text
+            self.pod_manager.fetch_container_logs(mock.MagicMock(), 
mock.MagicMock(), follow=True)
+
+        assert "message1 line1" in caplog.text
+        assert "message1 line2" in caplog.text
+        assert "message1 line3" in caplog.text
+        assert "message2 line1" in caplog.text
+        assert "message2 line2" in caplog.text
+        assert "message3 line1" in caplog.text
+        assert "ERROR" not in caplog.text
 
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
     def test_start_pod_retries_on_409_error(self, mock_run_pod_async):

Reply via email to