This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 87c26b464a7bba388c4830982fd11e7ae5384fa8 Author: Emil Ejbyfeldt <[email protected]> AuthorDate: Mon Mar 1 14:58:01 2021 +0100 BugFix: TypeError in monitor_pod (#14513) If the log read is interrupted before any logs are produced then `last_log_time` will not be set and the line `delta = pendulum.now() - last_log_time` will fail with ``` TypeError: unsupported operand type(s) for -: 'DateTime' and 'NoneType' ``` This commit fix this issue by only updating `read_logs_since_sec` if `last_log_time` has been set. (cherry picked from commit 45a0ac2e01c174754f4e6612c8e4d3125061d096) --- airflow/kubernetes/pod_launcher.py | 7 ++++--- tests/kubernetes/test_pod_launcher.py | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index 02194d7..3d663d2 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -140,9 +140,10 @@ class PodLauncher(LoggingMixin): break self.log.warning('Pod %s log read interrupted', pod.metadata.name) - delta = pendulum.now() - last_log_time - # Prefer logs duplication rather than loss - read_logs_since_sec = math.ceil(delta.total_seconds()) + if last_log_time: + delta = pendulum.now() - last_log_time + # Prefer logs duplication rather than loss + read_logs_since_sec = math.ceil(delta.total_seconds()) result = None if self.extract_xcom: while self.base_container_is_running(pod): diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py index 9e7cc82..6e40264 100644 --- a/tests/kubernetes/test_pod_launcher.py +++ b/tests/kubernetes/test_pod_launcher.py @@ -21,7 +21,7 @@ import pytest from requests.exceptions import BaseHTTPError from airflow.exceptions import AirflowException -from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.kubernetes.pod_launcher import PodLauncher, PodStatus class TestPodLauncher(unittest.TestCase): @@ -170,6 +170,22 @@ class TestPodLauncher(unittest.TestCase): ] ) + def test_monitor_pod_empty_logs(self): + mock.sentinel.metadata = mock.MagicMock() + running_status = mock.MagicMock() + running_status.configure_mock(**{'name': 'base', 'state.running': True}) + pod_info_running = mock.MagicMock(**{'status.container_statuses': [running_status]}) + pod_info_succeeded = mock.MagicMock(**{'status.phase': PodStatus.SUCCEEDED}) + + def pod_state_gen(): + yield pod_info_running + while True: + yield pod_info_succeeded + + self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen() + self.mock_kube_client.read_namespaced_pod_log.return_value = iter(()) + self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True) + def test_read_pod_retries_fails(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod.side_effect = [
