This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4752fb3  Fix issue with parsing error logs in the KPO (#15638)
4752fb3 is described below

commit 4752fb3eb8ac8827e6af6022fbcf751829ecb17a
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Jun 14 03:23:29 2021 -0500

    Fix issue with parsing error logs in the KPO (#15638)
    
    This fixes an issue where logs that do not have timestamps cause the
    KubernetesPodOperator to crash. Basically error logs created by airflow
    do not have timestamps, which was causing an unhandled exception that
    would kill the task. This PR handles that exception and ensures
    continued task processing
---
 .../providers/cncf/kubernetes/utils/pod_launcher.py    | 18 +++++++++++++-----
 .../cncf/kubernetes/utils/test_pod_launcher.py         | 16 ++++++++++------
 2 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py 
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index 741b475..6eada39 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -19,7 +19,7 @@ import json
 import math
 import time
 from datetime import datetime as dt
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Union
 
 import pendulum
 import tenacity
@@ -27,6 +27,8 @@ from kubernetes import client, watch
 from kubernetes.client.models.v1_pod import V1Pod
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
+from pendulum import Date, DateTime, Duration, Time
+from pendulum.parsing.exceptions import ParserError
 from requests.exceptions import BaseHTTPError
 
 from airflow.exceptions import AirflowException
@@ -137,15 +139,16 @@ class PodLauncher(LoggingMixin):
         :param get_logs: whether to read the logs locally
         :return:  Tuple[State, Optional[str]]
         """
-        if get_logs:
+        if get_logs:  # pylint: disable=too-many-nested-blocks
             read_logs_since_sec = None
             last_log_time = None
             while True:
                 logs = self.read_pod_logs(pod, timestamps=True, 
since_seconds=read_logs_since_sec)
                 for line in logs:
                     timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
-                    last_log_time = pendulum.parse(timestamp)
                     self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
                 time.sleep(1)
 
                 if not self.base_container_is_running(pod):
@@ -169,7 +172,7 @@ class PodLauncher(LoggingMixin):
             time.sleep(2)
         return self._task_status(self.read_pod(pod)), result
 
-    def parse_log_line(self, line: str) -> Tuple[str, str]:
+    def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, 
DateTime, Duration]], str]:
         """
         Parse K8s log line and returns the final state
 
@@ -183,7 +186,12 @@ class PodLauncher(LoggingMixin):
             raise Exception(f'Log not in "{{timestamp}} {{log}}" format. Got: 
{line}')
         timestamp = line[:split_at]
         message = line[split_at + 1 :].rstrip()
-        return timestamp, message
+        try:
+            last_log_time = pendulum.parse(timestamp)
+        except ParserError:
+            self.log.error("Error parsing timestamp. Will continue execution 
but won't update timestamp")
+            return None, line
+        return last_log_time, message
 
     def _task_status(self, event):
         self.log.info('Event: %s had an event of type %s', 
event.metadata.name, event.status.phase)
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
index 6b8ba66..f308f03 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
@@ -17,6 +17,7 @@
 import unittest
 from unittest import mock
 
+import pendulum
 import pytest
 from kubernetes.client.rest import ApiException
 from requests.exceptions import BaseHTTPError
@@ -198,12 +199,15 @@ class TestPodLauncher(unittest.TestCase):
             self.pod_launcher.read_pod(mock.sentinel)
 
     def test_parse_log_line(self):
-        timestamp, message = self.pod_launcher.parse_log_line(
-            '2020-10-08T14:16:17.793417674Z Valid message\n'
-        )
-
-        assert timestamp == '2020-10-08T14:16:17.793417674Z'
-        assert message == 'Valid message'
+        log_message = "This should return no timestamp"
+        timestamp, line = self.pod_launcher.parse_log_line(log_message)
+        self.assertEqual(timestamp, None)
+        self.assertEqual(line, log_message)
+
+        real_timestamp = "2020-10-08T14:16:17.793417674Z"
+        timestamp, line = self.pod_launcher.parse_log_line(" 
".join([real_timestamp, log_message]))
+        self.assertEqual(timestamp, pendulum.parse(real_timestamp))
+        self.assertEqual(line, log_message)
 
         with pytest.raises(Exception):
             
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n')

Reply via email to