tirkarthi opened a new issue, #46224:
URL: https://github.com/apache/airflow/issues/46224

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers
   
   On tasks with retry when a `TaskFailedEvent` is from trigger the task is 
marked as failed with the retries not executed for the failed tasks. Similar 
issue with callbacks and also email_on_failure, email_on_retry also not being 
honored. This happens in main and also 2.10.4 . PR related 
https://github.com/apache/airflow/pull/40084
   
   ### What you think should happen instead?
   
   Retries, callbacks and emails on failure should be honored. 
   
   ### How to reproduce
   
   1. Create following triggers and dag with retries and callback.
   2. Run the dag with `TaskFailedEvent` emitted from the trigger.
   3. The dag is marked as failed without retries and callbacks executed.
   4. Uncomment yielding `TriggerEvent` and run the dag again with new code.
   5. Retry happens and the failure callback is also executed.
   
   ```python
   # plugins/custom_trigger.py
   from __future__ import annotations
   
   import asyncio
   import logging
   
   from airflow.triggers.base import BaseTrigger, TriggerEvent, 
TaskSuccessEvent, TaskFailedEvent
   from airflow.utils import timezone
   
   class StateTrigger(BaseTrigger):
       def __init__(self, state):
           super().__init__()
           self.state = state
   
       def serialize(self):
           return ("custom_trigger.StateTrigger", {"state": self.state})
   
       async def run(self):
           if self.state == "success":
               yield TaskSuccessEvent()
           else:
               yield TaskFailedEvent()
   
           # yield TriggerEvent(self.state)
   ```
   
   ```python
   
   # dag_state_test.py
   
   from __future__ import annotations
   
   from datetime import datetime
   
   from custom_trigger import StateTrigger
   
   from airflow import DAG
   from airflow.models.baseoperator import BaseOperator
   
   
   class MultipleDeferTrigger(BaseOperator):
       """Multiple defer trigger."""
   
       def __init__(self, state=None, *args, **kwargs):
           self.state = state
           super().__init__(*args, **kwargs)
   
       def execute(self, context):
           self.defer(
               trigger=StateTrigger(self.state),
               method_name="execute_complete",
           )
   
       def execute_complete(self, context, event=None):
           raise Exception(event)
   
   with DAG(
       dag_id="state_defer",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       schedule=None,
       default_args = {
           "on_success_callback": lambda context: 
open("/tmp/on_success_callback", "w+").write(str(datetime.now())),
           "on_failure_callback": lambda context: 
open("/tmp/on_failure_callback", "w+").write(str(datetime.now())),
           "retries": 1,
           "retry_delay": 5.0
       }
   ) as dag:
       success = MultipleDeferTrigger(task_id="success", state="success", 
retry_delay=5.0)
       failed = MultipleDeferTrigger(task_id="failed", state="failed", 
retry_delay=5.0)
   
       success
       failed
   
   ```
   
   ### Operating System
   
   Ubuntu 20.04.3 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to