tirkarthi opened a new issue, #46224: URL: https://github.com/apache/airflow/issues/46224
### Apache Airflow version main (development) ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers On tasks with retry when a `TaskFailedEvent` is from trigger the task is marked as failed with the retries not executed for the failed tasks. Similar issue with callbacks and also email_on_failure, email_on_retry also not being honored. This happens in main and also 2.10.4 . PR related https://github.com/apache/airflow/pull/40084 ### What you think should happen instead? Retries, callbacks and emails on failure should be honored. ### How to reproduce 1. Create following triggers and dag with retries and callback. 2. Run the dag with `TaskFailedEvent` emitted from the trigger. 3. The dag is marked as failed without retries and callbacks executed. 4. Uncomment yielding `TriggerEvent` and run the dag again with new code. 5. Retry happens and the failure callback is also executed. ```python # plugins/custom_trigger.py from __future__ import annotations import asyncio import logging from airflow.triggers.base import BaseTrigger, TriggerEvent, TaskSuccessEvent, TaskFailedEvent from airflow.utils import timezone class StateTrigger(BaseTrigger): def __init__(self, state): super().__init__() self.state = state def serialize(self): return ("custom_trigger.StateTrigger", {"state": self.state}) async def run(self): if self.state == "success": yield TaskSuccessEvent() else: yield TaskFailedEvent() # yield TriggerEvent(self.state) ``` ```python # dag_state_test.py from __future__ import annotations from datetime import datetime from custom_trigger import StateTrigger from airflow import DAG from airflow.models.baseoperator import BaseOperator class MultipleDeferTrigger(BaseOperator): """Multiple defer trigger.""" def __init__(self, state=None, *args, **kwargs): self.state = state super().__init__(*args, **kwargs) def execute(self, context): self.defer( trigger=StateTrigger(self.state), method_name="execute_complete", ) def execute_complete(self, context, event=None): raise Exception(event) with DAG( dag_id="state_defer", start_date=datetime(2021, 1, 1), catchup=False, schedule=None, default_args = { "on_success_callback": lambda context: open("/tmp/on_success_callback", "w+").write(str(datetime.now())), "on_failure_callback": lambda context: open("/tmp/on_failure_callback", "w+").write(str(datetime.now())), "retries": 1, "retry_delay": 5.0 } ) as dag: success = MultipleDeferTrigger(task_id="success", state="success", retry_delay=5.0) failed = MultipleDeferTrigger(task_id="failed", state="failed", retry_delay=5.0) success failed ``` ### Operating System Ubuntu 20.04.3 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
