This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 64d2f54 Do not fail KubernetesPodOperator tasks if log reading fails
(#17649)
64d2f54 is described below
commit 64d2f5488f6764194a2f4f8a01f961990c75b840
Author: Witold Baryluk <[email protected]>
AuthorDate: Tue Sep 7 14:47:10 2021 +0000
Do not fail KubernetesPodOperator tasks if log reading fails (#17649)
In very long running airflow tasks using KubernetesPodOperator,
especially when airflow is running in a different k8s cluster than where
the pod is started with, we see sporadic, but reasonably frequent
failures like this, after 5-13 hours of runtime:
[2021-08-16 04:00:25,871] {pod_launcher.py:198} INFO - Event: foo-bar.xyz
had an event of type Running
[2021-08-16 04:00:25,893] {pod_launcher.py:149} INFO - 210816.0400+0000
app-specific-logs...
...
... (~few log lines ever few minutes from the app)
...
[2021-08-16 17:20:29,585] {pod_launcher.py:149} INFO - 210816.1720+0000
app-specific-logs....
[2021-08-16 17:27:36,105] {taskinstance.py:1501} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 436,
in _error_catcher
yield
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 763,
in read_chunked
self._update_chunk_length()
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 693,
in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.7/ssl.py", line 929, in read
return self._sslobj.read(len, buffer)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1361, in _execute_task
result = task_copy.execute(context=context)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 366, in execute
final_state, remote_pod, result =
self.create_new_pod_for_operator(labels, launcher)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 520, in create_new_pod_for_operator
final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod,
get_logs=self.get_logs)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
line 147, in monitor_pod
for line in logs:
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 807,
in __iter__
for chunk in self.stream(decode_content=True):
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 571,
in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 792,
in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 454,
in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: TimeoutError(110,
'Connection timed out')", TimeoutError(110, 'Connection timed out'))
Most likely because the task is not emitting a lot of logs, or simply due
to sporadic network slowdown between clusters.
So, if this fails, do not fail whole operator and terminate the task,
until the call to `self.base_container_is_running` function also fails or
returns false.
---
.../cncf/kubernetes/utils/pod_launcher.py | 27 +++++++++++++++-------
.../cncf/kubernetes/utils/test_pod_launcher.py | 25 +++++++++++++++++++-
2 files changed, 43 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index 26826c9..36c68b1 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -143,12 +143,21 @@ class PodLauncher(LoggingMixin):
read_logs_since_sec = None
last_log_time = None
while True:
- logs = self.read_pod_logs(pod, timestamps=True,
since_seconds=read_logs_since_sec)
- for line in logs:
- timestamp, message =
self.parse_log_line(line.decode('utf-8'))
- self.log.info(message)
- if timestamp:
- last_log_time = timestamp
+ try:
+ logs = self.read_pod_logs(pod, timestamps=True,
since_seconds=read_logs_since_sec)
+ for line in logs:
+ timestamp, message =
self.parse_log_line(line.decode('utf-8'))
+ self.log.info(message)
+ if timestamp:
+ last_log_time = timestamp
+ except BaseHTTPError:
+ # Catches errors like ProtocolError(TimeoutError).
+ self.log.warning(
+ 'Failed to read logs for pod %s',
+ pod.metadata.name,
+ exc_info=True,
+ )
+
time.sleep(1)
if not self.base_container_is_running(pod):
@@ -243,8 +252,10 @@ class PodLauncher(LoggingMixin):
_preload_content=False,
**additional_kwargs,
)
- except BaseHTTPError as e:
- raise AirflowException(f'There was an error reading the kubernetes
API: {e}')
+ except BaseHTTPError:
+ self.log.exception('There was an error reading the kubernetes
API.')
+ # Reraise to be catched by self.monitor_pod.
+ raise
@tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod):
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
index f308f03..d485dec 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
@@ -73,7 +73,7 @@ class TestPodLauncher(unittest.TestCase):
BaseHTTPError('Boom'),
BaseHTTPError('Boom'),
]
- with pytest.raises(AirflowException):
+ with pytest.raises(BaseHTTPError):
self.pod_launcher.read_pod_logs(mock.sentinel)
def test_read_pod_logs_successfully_with_tail_lines(self):
@@ -188,6 +188,29 @@ class TestPodLauncher(unittest.TestCase):
self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
+ def test_monitor_pod_logs_failures_non_fatal(self):
+ mock.sentinel.metadata = mock.MagicMock()
+ running_status = mock.MagicMock()
+ running_status.configure_mock(**{'name': 'base', 'state.running':
True})
+ pod_info_running = mock.MagicMock(**{'status.container_statuses':
[running_status]})
+ pod_info_succeeded = mock.MagicMock(**{'status.phase':
PodStatus.SUCCEEDED})
+
+ def pod_state_gen():
+ yield pod_info_running
+ yield pod_info_running
+ while True:
+ yield pod_info_succeeded
+
+ self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
+
+ def pod_log_gen():
+ while True:
+ yield BaseHTTPError('Boom')
+
+ self.mock_kube_client.read_namespaced_pod_log.side_effect =
pod_log_gen()
+
+ self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
+
def test_read_pod_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [