This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 90baac669c fix issue: DAG's on_failure_callback is not invoked when
task failed during testing dag. (#30965)
90baac669c is described below
commit 90baac669c446eb4dfb9166d996de59289044983
Author: bo zeng <[email protected]>
AuthorDate: Fri Jun 9 17:05:23 2023 +0800
fix issue: DAG's on_failure_callback is not invoked when task failed during
testing dag. (#30965)
---------
Co-authored-by: zengbo <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/models/dag.py | 14 ++++++++++----
tests/models/test_dag.py | 42 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 5fabd203b8..a816c9548b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2697,10 +2697,16 @@ class DAG(LoggingMixin):
# than creating a BackfillJob and allows us to surface logs to the user
while dr.state == State.RUNNING:
schedulable_tis, _ = dr.update_state(session=session)
- for ti in schedulable_tis:
- add_logger_if_needed(ti)
- ti.task = tasks[ti.task_id]
- _run_task(ti, session=session)
+ try:
+ for ti in schedulable_tis:
+ add_logger_if_needed(ti)
+ ti.task = tasks[ti.task_id]
+ _run_task(ti, session=session)
+ except Exception:
+ self.log.info(
+ "Task failed. DAG will continue to run until finished and
be marked as failed.",
+ exc_info=True,
+ )
if conn_file_path or variable_file_path:
# Remove the local variables we have added to the
secrets_backend_list
secrets_backend_list.pop(0)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index e56f91e71e..68ec8e9bca 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1911,6 +1911,48 @@ class TestDag:
dag.test()
mock_object.assert_called_with("output of first task")
+ def test_dag_test_with_fail_handler(self):
+ mock_handle_object_1 = mock.MagicMock()
+ mock_handle_object_2 = mock.MagicMock()
+
+ def handle_task_failure(context):
+ ti = context["task_instance"]
+ mock_handle_object_1(f"task {ti.task_id} failed...")
+
+ def handle_dag_failure(context):
+ ti = context["task_instance"]
+ mock_handle_object_2(f"dag {ti.dag_id} run failed...")
+
+ dag = DAG(
+ dag_id="test_local_testing_conn_file",
+ default_args={"on_failure_callback": handle_task_failure},
+ on_failure_callback=handle_dag_failure,
+ start_date=DEFAULT_DATE,
+ )
+
+ mock_task_object_1 = mock.MagicMock()
+ mock_task_object_2 = mock.MagicMock()
+
+ @task_decorator
+ def check_task():
+ mock_task_object_1()
+ raise AirflowException("boooom")
+
+ @task_decorator
+ def check_task_2(my_input):
+ # we call a mock object to ensure that this task actually ran.
+ mock_task_object_2(my_input)
+
+ with dag:
+ check_task_2(check_task())
+
+ dag.test()
+
+ mock_handle_object_1.assert_called_with("task check_task failed...")
+ mock_handle_object_2.assert_called_with("dag
test_local_testing_conn_file run failed...")
+ mock_task_object_1.assert_called()
+ mock_task_object_2.assert_not_called()
+
def test_dag_test_with_task_mapping(self):
dag = DAG(dag_id="test_local_testing_conn_file",
start_date=DEFAULT_DATE)
mock_object = mock.MagicMock()