This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 01a0aca Do not seek error file when it is closed (#17187)
01a0aca is described below
commit 01a0aca249eeaf71d182bf537b9d04121257ac09
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Jul 25 16:19:18 2021 +0100
Do not seek error file when it is closed (#17187)
We do not check if error file is closed before we seek it, which causes
exceptions.
Sometimes, this error file does not exist e.g when the task state is
changed externally.
This change fixes it by returning None when the file is closed so that
custom text can be used for error.
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/models/taskinstance.py | 2 ++
tests/models/test_taskinstance.py | 13 +++++++++++++
2 files changed, 15 insertions(+)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a71230f..6fb437b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -112,6 +112,8 @@ def set_current_context(context: Context):
def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]:
"""Load and return error from error file"""
+ if fd.closed:
+ return None
fd.seek(0, os.SEEK_SET)
data = fd.read()
if not data:
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 1656820..39c99da 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -21,6 +21,7 @@ import os
import time
import unittest
import urllib
+from tempfile import NamedTemporaryFile
from typing import List, Optional, Union, cast
from unittest import mock
from unittest.mock import call, mock_open, patch
@@ -48,6 +49,7 @@ from airflow.models import (
TaskReschedule,
Variable,
)
+from airflow.models.taskinstance import load_error_file, set_error_file
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
@@ -119,6 +121,17 @@ class TestTaskInstance(unittest.TestCase):
def tearDown(self):
self.clean_db()
+ def test_load_error_file_returns_None_for_closed_file(self):
+ error_fd = NamedTemporaryFile()
+ error_fd.close()
+ assert load_error_file(error_fd) is None
+
+ def test_load_error_file_loads_correctly(self):
+ error_message = "some random error message"
+ with NamedTemporaryFile() as error_fd:
+ set_error_file(error_fd.name, error=error_message)
+ assert load_error_file(error_fd) == error_message
+
def test_set_task_dates(self):
"""
Test that tasks properly take start/end dates from DAGs