This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 01a0aca  Do not seek error file when it is closed (#17187)
01a0aca is described below

commit 01a0aca249eeaf71d182bf537b9d04121257ac09
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Jul 25 16:19:18 2021 +0100

    Do not seek error file when it is closed (#17187)
    
    We do not check if error file is closed before we seek it, which causes 
exceptions.
    Sometimes, this error file does not exist e.g when the task state is 
changed externally.
    
    This change fixes it by returning None when the file is closed so that 
custom text can be used for error.
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow/models/taskinstance.py    |  2 ++
 tests/models/test_taskinstance.py | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a71230f..6fb437b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -112,6 +112,8 @@ def set_current_context(context: Context):
 
 def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]:
     """Load and return error from error file"""
+    if fd.closed:
+        return None
     fd.seek(0, os.SEEK_SET)
     data = fd.read()
     if not data:
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 1656820..39c99da 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -21,6 +21,7 @@ import os
 import time
 import unittest
 import urllib
+from tempfile import NamedTemporaryFile
 from typing import List, Optional, Union, cast
 from unittest import mock
 from unittest.mock import call, mock_open, patch
@@ -48,6 +49,7 @@ from airflow.models import (
     TaskReschedule,
     Variable,
 )
+from airflow.models.taskinstance import load_error_file, set_error_file
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import PythonOperator
@@ -119,6 +121,17 @@ class TestTaskInstance(unittest.TestCase):
     def tearDown(self):
         self.clean_db()
 
+    def test_load_error_file_returns_None_for_closed_file(self):
+        error_fd = NamedTemporaryFile()
+        error_fd.close()
+        assert load_error_file(error_fd) is None
+
+    def test_load_error_file_loads_correctly(self):
+        error_message = "some random error message"
+        with NamedTemporaryFile() as error_fd:
+            set_error_file(error_fd.name, error=error_message)
+            assert load_error_file(error_fd) == error_message
+
     def test_set_task_dates(self):
         """
         Test that tasks properly take start/end dates from DAGs

Reply via email to