This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 64d2f54  Do not fail KubernetesPodOperator tasks if log reading fails 
(#17649)
64d2f54 is described below

commit 64d2f5488f6764194a2f4f8a01f961990c75b840
Author: Witold Baryluk <[email protected]>
AuthorDate: Tue Sep 7 14:47:10 2021 +0000

    Do not fail KubernetesPodOperator tasks if log reading fails (#17649)
    
    In very long running airflow tasks using KubernetesPodOperator,
    especially when airflow is running in a different k8s cluster than where
    the pod is started with, we see sporadic, but reasonably frequent
    failures like this, after 5-13 hours of runtime:
    
    [2021-08-16 04:00:25,871] {pod_launcher.py:198} INFO - Event: foo-bar.xyz 
had an event of type Running
    [2021-08-16 04:00:25,893] {pod_launcher.py:149} INFO - 210816.0400+0000 
app-specific-logs...
    ...
    ... (~few log lines ever few minutes from the app)
    ...
    [2021-08-16 17:20:29,585] {pod_launcher.py:149} INFO - 210816.1720+0000 
app-specific-logs....
    [2021-08-16 17:27:36,105] {taskinstance.py:1501} ERROR - Task failed with 
exception
    Traceback (most recent call last):
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 436, 
in _error_catcher
        yield
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 763, 
in read_chunked
        self._update_chunk_length()
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 693, 
in _update_chunk_length
        line = self._fp.fp.readline()
      File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
        return self._sock.recv_into(b)
      File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
        return self.read(nbytes, buffer)
      File "/usr/local/lib/python3.7/ssl.py", line 929, in read
        return self._sslobj.read(len, buffer)
    TimeoutError: [Errno 110] Connection timed out
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1157, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1331, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1361, in _execute_task
        result = task_copy.execute(context=context)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 366, in execute
        final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 520, in create_new_pod_for_operator
        final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
 line 147, in monitor_pod
        for line in logs:
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 807, 
in __iter__
        for chunk in self.stream(decode_content=True):
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 571, 
in stream
        for line in self.read_chunked(amt, decode_content=decode_content):
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 792, 
in read_chunked
        self._original_response.close()
      File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
        self.gen.throw(type, value, traceback)
      File 
"/opt/pysetup/.venv/lib/python3.7/site-packages/urllib3/response.py", line 454, 
in _error_catcher
        raise ProtocolError("Connection broken: %r" % e, e)
    urllib3.exceptions.ProtocolError: ("Connection broken: TimeoutError(110, 
'Connection timed out')", TimeoutError(110, 'Connection timed out'))
    
    Most likely because the task is not emitting a lot of logs, or simply due
    to sporadic network slowdown between clusters.
    
    So, if this fails, do not fail whole operator and terminate the task,
    until the call to `self.base_container_is_running` function also fails or
    returns false.
---
 .../cncf/kubernetes/utils/pod_launcher.py          | 27 +++++++++++++++-------
 .../cncf/kubernetes/utils/test_pod_launcher.py     | 25 +++++++++++++++++++-
 2 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py 
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index 26826c9..36c68b1 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -143,12 +143,21 @@ class PodLauncher(LoggingMixin):
             read_logs_since_sec = None
             last_log_time = None
             while True:
-                logs = self.read_pod_logs(pod, timestamps=True, 
since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
-                    self.log.info(message)
-                    if timestamp:
-                        last_log_time = timestamp
+                try:
+                    logs = self.read_pod_logs(pod, timestamps=True, 
since_seconds=read_logs_since_sec)
+                    for line in logs:
+                        timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                        self.log.info(message)
+                        if timestamp:
+                            last_log_time = timestamp
+                except BaseHTTPError:
+                    # Catches errors like ProtocolError(TimeoutError).
+                    self.log.warning(
+                        'Failed to read logs for pod %s',
+                        pod.metadata.name,
+                        exc_info=True,
+                    )
+
                 time.sleep(1)
 
                 if not self.base_container_is_running(pod):
@@ -243,8 +252,10 @@ class PodLauncher(LoggingMixin):
                 _preload_content=False,
                 **additional_kwargs,
             )
-        except BaseHTTPError as e:
-            raise AirflowException(f'There was an error reading the kubernetes 
API: {e}')
+        except BaseHTTPError:
+            self.log.exception('There was an error reading the kubernetes 
API.')
+            # Reraise to be catched by self.monitor_pod.
+            raise
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
     def read_pod_events(self, pod):
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
index f308f03..d485dec 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
@@ -73,7 +73,7 @@ class TestPodLauncher(unittest.TestCase):
             BaseHTTPError('Boom'),
             BaseHTTPError('Boom'),
         ]
-        with pytest.raises(AirflowException):
+        with pytest.raises(BaseHTTPError):
             self.pod_launcher.read_pod_logs(mock.sentinel)
 
     def test_read_pod_logs_successfully_with_tail_lines(self):
@@ -188,6 +188,29 @@ class TestPodLauncher(unittest.TestCase):
         self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
         self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
 
+    def test_monitor_pod_logs_failures_non_fatal(self):
+        mock.sentinel.metadata = mock.MagicMock()
+        running_status = mock.MagicMock()
+        running_status.configure_mock(**{'name': 'base', 'state.running': 
True})
+        pod_info_running = mock.MagicMock(**{'status.container_statuses': 
[running_status]})
+        pod_info_succeeded = mock.MagicMock(**{'status.phase': 
PodStatus.SUCCEEDED})
+
+        def pod_state_gen():
+            yield pod_info_running
+            yield pod_info_running
+            while True:
+                yield pod_info_succeeded
+
+        self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
+
+        def pod_log_gen():
+            while True:
+                yield BaseHTTPError('Boom')
+
+        self.mock_kube_client.read_namespaced_pod_log.side_effect = 
pod_log_gen()
+
+        self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
+
     def test_read_pod_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [

Reply via email to