eejbyfeldt opened a new issue, #30146: URL: https://github.com/apache/airflow/issues/30146
### Apache Airflow version 2.5.2 ### What happened The changes in https://github.com/apache/airflow/pull/29743 adds a new places where the `on_failure_callback` is called. This leads to two incorrect behaviors. 1. The `on_failure_callback` is incorrectly called when a task has retries and goes in `UP_FOR_RETRY` 2. The `on_failure_callback` is sometimes called twice ### What you think should happen instead The on_failure_callback should only be called once when the task goes into a failed state. ### How to reproduce These two patches (https://github.com/eejbyfeldt/airflow/commit/b0e7a0ae3b2c494bb75772866466110c6b3b7e8f, https://github.com/eejbyfeldt/airflow/commit/4734378b07efd9bb4560690747fef25571ac24a4) modifies and existing test case to show that it now in correctly and the second one adds a test case showing it now gets called more than once. ``` From b0e7a0ae3b2c494bb75772866466110c6b3b7e8f Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt <[email protected]> Date: Thu, 16 Mar 2023 14:46:04 +0100 Subject: [PATCH 1/2] Modify spec to show that callback is now incorrectly called --- tests/models/test_taskinstance.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 50cac05296..d4a0328513 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -448,7 +448,7 @@ class TestTaskInstance: ti.run() assert State.SKIPPED == ti.state - def test_task_sigterm_works_with_retries(self, dag_maker): + def test_task_sigterm_works_with_retries(self, dag_maker, caplog): """ Test that ensures that tasks are retried when they receive sigterm """ @@ -462,6 +462,7 @@ class TestTaskInstance: python_callable=task_function, retries=1, retry_delay=datetime.timedelta(seconds=2), + on_failure_callback=lambda context: context["ti"].log.info("on_failure_callback called"), ) dr = dag_maker.create_dagrun() @@ -471,6 +472,7 @@ class TestTaskInstance: ti.run() ti.refresh_from_db() assert ti.state == State.UP_FOR_RETRY + assert "on_failure_callback called" not in caplog.text def test_task_sigterm_calls_on_failure_callack(self, dag_maker, caplog): """ -- 2.39.2 ``` ``` From 4734378b07efd9bb4560690747fef25571ac24a4 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt <[email protected]> Date: Thu, 16 Mar 2023 16:10:53 +0100 Subject: [PATCH 2/2] Add test case for on_failure_callback only being called once --- tests/models/test_taskinstance.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index d4a0328513..8ced863f96 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -474,6 +474,32 @@ class TestTaskInstance: assert ti.state == State.UP_FOR_RETRY assert "on_failure_callback called" not in caplog.text + def test_task_sigterm_call_on_failure_callback_only_once(self, dag_maker, caplog): + """ + Test that ensures that tasks are retried when they receive sigterm + """ + + def task_function(ti): + os.kill(ti.pid, signal.SIGTERM) + + with dag_maker("test_mark_failure_2"): + task = PythonOperator( + task_id="test_on_failure", + python_callable=task_function, + retries=0, + retry_delay=datetime.timedelta(seconds=2), + on_failure_callback=lambda context: context["ti"].log.info("on_failure_callback called"), + ) + + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + ti.task = task + with pytest.raises(AirflowException): + ti.run() + ti.refresh_from_db() + assert ti.state == State.FAILED + assert caplog.text.count("on_failure_callback called") == 1 + def test_task_sigterm_calls_on_failure_callack(self, dag_maker, caplog): """ Test that ensures that tasks call on_failure_callback when they receive sigterm -- 2.39.2 ``` Reverting the code changes from https://github.com/apache/airflow/pull/29743 both of these specs passes and the new spec add in that PR also succeeds without the code changes in it. So it not clear it solves the bug it intended to solve. ### Operating System Fedora 37 ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
