This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 11d9daa8a9 Fix TaskInstance.task not defined before handle_failure
(#26040)
11d9daa8a9 is described below
commit 11d9daa8a9464a3102041093b162cf7934ee77b0
Author: Chenglong Yan <[email protected]>
AuthorDate: Thu Sep 8 15:47:06 2022 +0800
Fix TaskInstance.task not defined before handle_failure (#26040)
---
airflow/models/log.py | 2 +-
airflow/models/taskinstance.py | 6 +++---
tests/models/test_taskinstance.py | 10 ++++++++++
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/airflow/models/log.py b/airflow/models/log.py
index 3f658c2174..0800aac566 100644
--- a/airflow/models/log.py
+++ b/airflow/models/log.py
@@ -55,7 +55,7 @@ class Log(Base):
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
self.map_index = task_instance.map_index
- if task_instance.task:
+ if getattr(task_instance, 'task', None):
task_owner = task_instance.task.owner
if 'task_id' in kwargs:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index cc5a733a63..80b4068da3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1872,7 +1872,7 @@ class TaskInstance(Base, LoggingMixin):
self.clear_next_method_args()
# In extreme cases (zombie in case of dag with parse error) we might
_not_ have a Task.
- if context is None and self.task:
+ if context is None and getattr(self, 'task', None):
context = self.get_template_context(session)
if context is not None:
@@ -1892,7 +1892,7 @@ class TaskInstance(Base, LoggingMixin):
task: Optional[BaseOperator] = None
try:
- if self.task and context:
+ if getattr(self, 'task', None) and context:
task = self.task.unmap((context, session))
except Exception:
self.log.error("Unable to unmap task to determine if we need to
send an alert email")
@@ -1931,7 +1931,7 @@ class TaskInstance(Base, LoggingMixin):
# If a task is cleared when running, it goes into RESTARTING state
and is always
# eligible for retry
return True
- if not self.task:
+ if not getattr(self, 'task', None):
# Couldn't load the task, don't know number of retries, guess:
return self.try_number <= self.max_tries
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index e39cc5bf9c..7b90ffd4cb 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2340,6 +2340,16 @@ class TestTaskInstance:
Stats_incr.assert_any_call('ti_failures')
Stats_incr.assert_any_call('operator_failures_EmptyOperator')
+ def test_handle_failure_task_undefined(self, create_task_instance):
+ """
+ When the loaded taskinstance does not use refresh_from_task, the task
may be undefined.
+ For example:
+ the DAG file has been deleted before executing
_execute_task_callbacks
+ """
+ ti = create_task_instance()
+ del ti.task
+ ti.handle_failure("test ti.task undefined")
+
def test_does_not_retry_on_airflow_fail_exception(self, dag_maker):
def fail():
raise AirflowFailException("hopeless")