This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 11d9daa8a9 Fix TaskInstance.task not defined before handle_failure 
(#26040)
11d9daa8a9 is described below

commit 11d9daa8a9464a3102041093b162cf7934ee77b0
Author: Chenglong Yan <[email protected]>
AuthorDate: Thu Sep 8 15:47:06 2022 +0800

    Fix TaskInstance.task not defined before handle_failure (#26040)
---
 airflow/models/log.py             |  2 +-
 airflow/models/taskinstance.py    |  6 +++---
 tests/models/test_taskinstance.py | 10 ++++++++++
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/airflow/models/log.py b/airflow/models/log.py
index 3f658c2174..0800aac566 100644
--- a/airflow/models/log.py
+++ b/airflow/models/log.py
@@ -55,7 +55,7 @@ class Log(Base):
             self.task_id = task_instance.task_id
             self.execution_date = task_instance.execution_date
             self.map_index = task_instance.map_index
-            if task_instance.task:
+            if getattr(task_instance, 'task', None):
                 task_owner = task_instance.task.owner
 
         if 'task_id' in kwargs:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index cc5a733a63..80b4068da3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1872,7 +1872,7 @@ class TaskInstance(Base, LoggingMixin):
         self.clear_next_method_args()
 
         # In extreme cases (zombie in case of dag with parse error) we might 
_not_ have a Task.
-        if context is None and self.task:
+        if context is None and getattr(self, 'task', None):
             context = self.get_template_context(session)
 
         if context is not None:
@@ -1892,7 +1892,7 @@ class TaskInstance(Base, LoggingMixin):
 
         task: Optional[BaseOperator] = None
         try:
-            if self.task and context:
+            if getattr(self, 'task', None) and context:
                 task = self.task.unmap((context, session))
         except Exception:
             self.log.error("Unable to unmap task to determine if we need to 
send an alert email")
@@ -1931,7 +1931,7 @@ class TaskInstance(Base, LoggingMixin):
             # If a task is cleared when running, it goes into RESTARTING state 
and is always
             # eligible for retry
             return True
-        if not self.task:
+        if not getattr(self, 'task', None):
             # Couldn't load the task, don't know number of retries, guess:
             return self.try_number <= self.max_tries
 
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index e39cc5bf9c..7b90ffd4cb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2340,6 +2340,16 @@ class TestTaskInstance:
         Stats_incr.assert_any_call('ti_failures')
         Stats_incr.assert_any_call('operator_failures_EmptyOperator')
 
+    def test_handle_failure_task_undefined(self, create_task_instance):
+        """
+        When the loaded taskinstance does not use refresh_from_task, the task 
may be undefined.
+        For example:
+            the DAG file has been deleted before executing 
_execute_task_callbacks
+        """
+        ti = create_task_instance()
+        del ti.task
+        ti.handle_failure("test ti.task undefined")
+
     def test_does_not_retry_on_airflow_fail_exception(self, dag_maker):
         def fail():
             raise AirflowFailException("hopeless")

Reply via email to