This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4752fb3 Fix issue with parsing error logs in the KPO (#15638)
4752fb3 is described below
commit 4752fb3eb8ac8827e6af6022fbcf751829ecb17a
Author: Daniel Imberman <[email protected]>
AuthorDate: Mon Jun 14 03:23:29 2021 -0500
Fix issue with parsing error logs in the KPO (#15638)
This fixes an issue where logs that do not have timestamps cause the
KubernetesPodOperator to crash. Basically error logs created by airflow
do not have timestamps, which was causing an unhandled exception that
would kill the task. This PR handles that exception and ensures
continued task processing
---
.../providers/cncf/kubernetes/utils/pod_launcher.py | 18 +++++++++++++-----
.../cncf/kubernetes/utils/test_pod_launcher.py | 16 ++++++++++------
2 files changed, 23 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index 741b475..6eada39 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -19,7 +19,7 @@ import json
import math
import time
from datetime import datetime as dt
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Union
import pendulum
import tenacity
@@ -27,6 +27,8 @@ from kubernetes import client, watch
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
+from pendulum import Date, DateTime, Duration, Time
+from pendulum.parsing.exceptions import ParserError
from requests.exceptions import BaseHTTPError
from airflow.exceptions import AirflowException
@@ -137,15 +139,16 @@ class PodLauncher(LoggingMixin):
:param get_logs: whether to read the logs locally
:return: Tuple[State, Optional[str]]
"""
- if get_logs:
+ if get_logs: # pylint: disable=too-many-nested-blocks
read_logs_since_sec = None
last_log_time = None
while True:
logs = self.read_pod_logs(pod, timestamps=True,
since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message =
self.parse_log_line(line.decode('utf-8'))
- last_log_time = pendulum.parse(timestamp)
self.log.info(message)
+ if timestamp:
+ last_log_time = timestamp
time.sleep(1)
if not self.base_container_is_running(pod):
@@ -169,7 +172,7 @@ class PodLauncher(LoggingMixin):
time.sleep(2)
return self._task_status(self.read_pod(pod)), result
- def parse_log_line(self, line: str) -> Tuple[str, str]:
+ def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time,
DateTime, Duration]], str]:
"""
Parse K8s log line and returns the final state
@@ -183,7 +186,12 @@ class PodLauncher(LoggingMixin):
raise Exception(f'Log not in "{{timestamp}} {{log}}" format. Got:
{line}')
timestamp = line[:split_at]
message = line[split_at + 1 :].rstrip()
- return timestamp, message
+ try:
+ last_log_time = pendulum.parse(timestamp)
+ except ParserError:
+ self.log.error("Error parsing timestamp. Will continue execution
but won't update timestamp")
+ return None, line
+ return last_log_time, message
def _task_status(self, event):
self.log.info('Event: %s had an event of type %s',
event.metadata.name, event.status.phase)
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
index 6b8ba66..f308f03 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
@@ -17,6 +17,7 @@
import unittest
from unittest import mock
+import pendulum
import pytest
from kubernetes.client.rest import ApiException
from requests.exceptions import BaseHTTPError
@@ -198,12 +199,15 @@ class TestPodLauncher(unittest.TestCase):
self.pod_launcher.read_pod(mock.sentinel)
def test_parse_log_line(self):
- timestamp, message = self.pod_launcher.parse_log_line(
- '2020-10-08T14:16:17.793417674Z Valid message\n'
- )
-
- assert timestamp == '2020-10-08T14:16:17.793417674Z'
- assert message == 'Valid message'
+ log_message = "This should return no timestamp"
+ timestamp, line = self.pod_launcher.parse_log_line(log_message)
+ self.assertEqual(timestamp, None)
+ self.assertEqual(line, log_message)
+
+ real_timestamp = "2020-10-08T14:16:17.793417674Z"
+ timestamp, line = self.pod_launcher.parse_log_line("
".join([real_timestamp, log_message]))
+ self.assertEqual(timestamp, pendulum.parse(real_timestamp))
+ self.assertEqual(line, log_message)
with pytest.raises(Exception):
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n')