This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d14213ebf Determine fail_stop on client side when db isolated (#39258)
3d14213ebf is described below

commit 3d14213ebffe06282f40c843e12ba34dc76df806
Author: Daniel Standish <[email protected]>
AuthorDate: Sat May 4 11:37:45 2024 -0700

    Determine fail_stop on client side when db isolated (#39258)
    
    * Determine fail_stop on client side when db isolated
    
    This is needed because we do not ser the dag on Operator objects.
    
    * fix tests
---
 airflow/models/taskinstance.py                 | 19 +++++++++++++++++--
 airflow/serialization/pydantic/taskinstance.py |  8 ++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 981f908619..112b239653 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -880,6 +880,7 @@ def _handle_failure(
     test_mode: bool | None = None,
     context: Context | None = None,
     force_fail: bool = False,
+    fail_stop: bool = False,
 ) -> None:
     """
     Handle Failure for a task instance.
@@ -903,6 +904,7 @@ def _handle_failure(
         context=context,
         force_fail=force_fail,
         session=session,
+        fail_stop=fail_stop,
     )
 
     _log_state(task_instance=task_instance, lead_msg="Immediate failure 
requested. " if force_fail else "")
@@ -2966,8 +2968,13 @@ class TaskInstance(Base, LoggingMixin):
         context: Context | None = None,
         force_fail: bool = False,
         session: Session = NEW_SESSION,
+        fail_stop: bool = False,
     ):
-        """Handle Failure for the TaskInstance."""
+        """
+        Handle Failure for the TaskInstance.
+
+        :param fail_stop: if true, stop remaining tasks in dag
+        """
         get_listener_manager().hook.on_task_instance_failed(
             previous_state=TaskInstanceState.RUNNING, task_instance=ti, 
error=error, session=session
         )
@@ -3030,7 +3037,7 @@ class TaskInstance(Base, LoggingMixin):
             email_for_state = operator.attrgetter("email_on_failure")
             callbacks = task.on_failure_callback if task else None
 
-            if task and task.dag and task.dag.fail_stop:
+            if task and fail_stop:
                 _stop_remaining_tasks(task_instance=ti, session=session)
         else:
             if ti.state == TaskInstanceState.QUEUED:
@@ -3079,6 +3086,13 @@ class TaskInstance(Base, LoggingMixin):
         :param context: Jinja2 context
         :param force_fail: if True, task does not retry
         """
+        if TYPE_CHECKING:
+            assert self.task
+            assert self.task.dag
+        try:
+            fail_stop = self.task.dag.fail_stop
+        except Exception:
+            fail_stop = False
         _handle_failure(
             task_instance=self,
             error=error,
@@ -3086,6 +3100,7 @@ class TaskInstance(Base, LoggingMixin):
             test_mode=test_mode,
             context=context,
             force_fail=force_fail,
+            fail_stop=fail_stop,
         )
 
     def is_eligible_to_retry(self):
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index cc52aa9989..2e01bf415a 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -276,6 +276,13 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
         """
         from airflow.models.taskinstance import _handle_failure
 
+        if TYPE_CHECKING:
+            assert self.task
+            assert self.task.dag
+        try:
+            fail_stop = self.task.dag.fail_stop
+        except Exception:
+            fail_stop = False
         _handle_failure(
             task_instance=self,
             error=error,
@@ -283,6 +290,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
             test_mode=test_mode,
             context=context,
             force_fail=force_fail,
+            fail_stop=fail_stop,
         )
 
     def refresh_from_task(self, task: Operator, pool_override: str | None = 
None) -> None:

Reply via email to