ronak-sirwani opened a new issue, #62845:
URL: https://github.com/apache/airflow/issues/62845

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.0.6
   
   ### What happened?
   
   When a Deferrable Operator fails during a trigger resumption and enters a 
Retry (following an Up For Retry state), the subsequent attempt incorrectly 
persists the next_method and next_kwargs from the previous failed attempt.
   Instead of starting fresh with the standard execute() entry point, the 
Worker attempts to immediately resume the task using the stale 
next_method(**next_kwargs) data. This causes the task to either process 
"stale/zombie" events from the previous attempt or fail prematurely because the 
initial setup logic in execute() was bypassed.
   
   - Airflow 2.x: This looks like a known issue addressed in 
https://github.com/apache/airflow/issues/18146. The fix ensured that when a 
task was reset for a retry, the next_method and next_kwargs columns in 
taskinstance table were explicitly nullified.
   Airflow 3.x: With the transition to the new Internal API and Task SDK (e.g., 
Issue https://github.com/apache/airflow/issues/47373), this reset logic appears 
to be missing and subsequent attempts skip execute() entirely.
   
   ### What you think should happen instead?
   
   Taskinstance attributes like next_method and next_kwargs should get reset on 
retry, so we can retry the task from the beginning.
   
   ### How to reproduce
   
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.exceptions import AirflowException
   from airflow.sdk import BaseOperator
   from airflow.triggers.testing import SuccessTrigger
   
   
   class RetryOperator(BaseOperator):
       def execute(self, context):
           ti = context["ti"]
           has_next_method = bool(ti._ti_context_from_server.next_method)
           try_number = ti.try_number
           self.log.info(
               f"In `execute`: has_next_method: {has_next_method}, 
try_number:{try_number}"
           )
   
           self.defer(
               trigger=SuccessTrigger(),
               method_name="next",
               kwargs={"execute_try_number": try_number},
           )
   
       def next(self, context, execute_try_number, event=None):
           self.log.info("In next!")
           ti = context["ti"]
           has_next_method = bool(ti._ti_context_from_server.next_method)
           try_number = ti.try_number
           self.log.info(
               f"In `next`: has_next_method: {has_next_method}, 
try_number:{try_number}, excute_try_number: {execute_try_number}"
           )
   
           if try_number == 1:
               # Force a retry
               raise AirflowException("Force a retry")
           # Did we run `execute`?
           if execute_try_number != try_number:
               raise AirflowException("`execute` wasn't run during retry!")
           return None  # Success!
   
   
   with DAG(
       "triggerer_retry", schedule_interval=None, start_date=datetime(2021, 9, 
13)
   ) as dag:
       RetryOperator(task_id="retry", retries=1, 
retry_delay=timedelta(seconds=15))
   ```
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   #### Temporary workaround:
   To mitigate this in my custom operators, I’ve had to implement a 
"Single-Entry" pattern to force idempotency:
   Always setting method_name="execute" in the self.defer() call.
   Passing a unique attempt_id (from ti.try_number) into the Trigger.
   Manually validating the attempt_id inside execute() to detect if the event 
is fresh or stale.
   While this works, it adds significant boilerplate and complexity to every 
custom operator.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to