eejbyfeldt opened a new issue, #30146:
URL: https://github.com/apache/airflow/issues/30146

   ### Apache Airflow version
   
   2.5.2
   
   ### What happened
   
   The changes in https://github.com/apache/airflow/pull/29743 adds a new 
places where the `on_failure_callback` is called. This leads to two incorrect 
behaviors. 
   
   1. The `on_failure_callback` is incorrectly called when a task has retries 
and goes in `UP_FOR_RETRY`
   
   2. The `on_failure_callback` is sometimes called twice
   
   ### What you think should happen instead
   
   The on_failure_callback should only be called once when the task goes into a 
failed state.
   
   ### How to reproduce
   
   These two patches 
(https://github.com/eejbyfeldt/airflow/commit/b0e7a0ae3b2c494bb75772866466110c6b3b7e8f,
 
https://github.com/eejbyfeldt/airflow/commit/4734378b07efd9bb4560690747fef25571ac24a4)
 modifies and existing test case to show that it now in correctly and the 
second one adds a test case showing it now gets called more than once.
   ```
   From b0e7a0ae3b2c494bb75772866466110c6b3b7e8f Mon Sep 17 00:00:00 2001
   From: Emil Ejbyfeldt <[email protected]>
   Date: Thu, 16 Mar 2023 14:46:04 +0100
   Subject: [PATCH 1/2] Modify spec to show that callback is now incorrectly
    called
   
   ---
    tests/models/test_taskinstance.py | 4 +++-
    1 file changed, 3 insertions(+), 1 deletion(-)
   
   diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
   index 50cac05296..d4a0328513 100644
   --- a/tests/models/test_taskinstance.py
   +++ b/tests/models/test_taskinstance.py
   @@ -448,7 +448,7 @@ class TestTaskInstance:
            ti.run()
            assert State.SKIPPED == ti.state
    
   -    def test_task_sigterm_works_with_retries(self, dag_maker):
   +    def test_task_sigterm_works_with_retries(self, dag_maker, caplog):
            """
            Test that ensures that tasks are retried when they receive sigterm
            """
   @@ -462,6 +462,7 @@ class TestTaskInstance:
                    python_callable=task_function,
                    retries=1,
                    retry_delay=datetime.timedelta(seconds=2),
   +                on_failure_callback=lambda context: 
context["ti"].log.info("on_failure_callback called"),
                )
    
            dr = dag_maker.create_dagrun()
   @@ -471,6 +472,7 @@ class TestTaskInstance:
                ti.run()
            ti.refresh_from_db()
            assert ti.state == State.UP_FOR_RETRY
   +        assert "on_failure_callback called" not in caplog.text
    
        def test_task_sigterm_calls_on_failure_callack(self, dag_maker, caplog):
            """
   -- 
   2.39.2
   ```
   ```
   From 4734378b07efd9bb4560690747fef25571ac24a4 Mon Sep 17 00:00:00 2001
   From: Emil Ejbyfeldt <[email protected]>
   Date: Thu, 16 Mar 2023 16:10:53 +0100
   Subject: [PATCH 2/2] Add test case for on_failure_callback only being called
    once
   
   ---
    tests/models/test_taskinstance.py | 26 ++++++++++++++++++++++++++
    1 file changed, 26 insertions(+)
   
   diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
   index d4a0328513..8ced863f96 100644
   --- a/tests/models/test_taskinstance.py
   +++ b/tests/models/test_taskinstance.py
   @@ -474,6 +474,32 @@ class TestTaskInstance:
            assert ti.state == State.UP_FOR_RETRY
            assert "on_failure_callback called" not in caplog.text
    
   +    def test_task_sigterm_call_on_failure_callback_only_once(self, 
dag_maker, caplog):
   +        """
   +        Test that ensures that tasks are retried when they receive sigterm
   +        """
   +
   +        def task_function(ti):
   +            os.kill(ti.pid, signal.SIGTERM)
   +
   +        with dag_maker("test_mark_failure_2"):
   +            task = PythonOperator(
   +                task_id="test_on_failure",
   +                python_callable=task_function,
   +                retries=0,
   +                retry_delay=datetime.timedelta(seconds=2),
   +                on_failure_callback=lambda context: 
context["ti"].log.info("on_failure_callback called"),
   +            )
   +
   +        dr = dag_maker.create_dagrun()
   +        ti = dr.task_instances[0]
   +        ti.task = task
   +        with pytest.raises(AirflowException):
   +            ti.run()
   +        ti.refresh_from_db()
   +        assert ti.state == State.FAILED
   +        assert caplog.text.count("on_failure_callback called") == 1
   +
        def test_task_sigterm_calls_on_failure_callack(self, dag_maker, caplog):
            """
            Test that ensures that tasks call on_failure_callback when they 
receive sigterm
   -- 
   2.39.2
   
   ```
   
   Reverting the code changes from https://github.com/apache/airflow/pull/29743 
 both of these specs passes and the new spec add in that PR also succeeds 
without the code changes in it. So it not clear it solves the bug it intended 
to solve. 
   
   ### Operating System
   
   Fedora 37
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to