This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 24e75e72c674b47244fda8780b9d5368abbb05f6 Author: Ephraim Anierobi <[email protected]> AuthorDate: Sun Jul 25 16:19:18 2021 +0100 Do not seek error file when it is closed (#17187) We do not check if error file is closed before we seek it, which causes exceptions. Sometimes, this error file does not exist e.g when the task state is changed externally. This change fixes it by returning None when the file is closed so that custom text can be used for error. Co-authored-by: Jed Cunningham <[email protected]> (cherry picked from commit 01a0aca249eeaf71d182bf537b9d04121257ac09) --- airflow/models/taskinstance.py | 2 ++ tests/models/test_taskinstance.py | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 58bb680..46ea7f9 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -110,6 +110,8 @@ def set_current_context(context: Context): def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]: """Load and return error from error file""" + if fd.closed: + return None fd.seek(0, os.SEEK_SET) data = fd.read() if not data: diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 11fe0cd..4b84d10 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -21,6 +21,7 @@ import os import time import unittest import urllib +from tempfile import NamedTemporaryFile from typing import List, Optional, Union, cast from unittest import mock from unittest.mock import call, mock_open, patch @@ -44,6 +45,7 @@ from airflow.models import ( TaskReschedule, Variable, ) +from airflow.models.taskinstance import load_error_file, set_error_file from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -113,6 +115,17 @@ class TestTaskInstance(unittest.TestCase): def tearDown(self): self.clean_db() + def test_load_error_file_returns_None_for_closed_file(self): + error_fd = NamedTemporaryFile() + error_fd.close() + assert load_error_file(error_fd) is None + + def test_load_error_file_loads_correctly(self): + error_message = "some random error message" + with NamedTemporaryFile() as error_fd: + set_error_file(error_fd.name, error=error_message) + assert load_error_file(error_fd) == error_message + def test_set_task_dates(self): """ Test that tasks properly take start/end dates from DAGs
