ronak-sirwani opened a new issue, #62845: URL: https://github.com/apache/airflow/issues/62845
### Apache Airflow version Other Airflow 3 version (please specify below) ### If "Other Airflow 3 version" selected, which one? 3.0.6 ### What happened? When a Deferrable Operator fails during a trigger resumption and enters a Retry (following an Up For Retry state), the subsequent attempt incorrectly persists the next_method and next_kwargs from the previous failed attempt. Instead of starting fresh with the standard execute() entry point, the Worker attempts to immediately resume the task using the stale next_method(**next_kwargs) data. This causes the task to either process "stale/zombie" events from the previous attempt or fail prematurely because the initial setup logic in execute() was bypassed. - Airflow 2.x: This looks like a known issue addressed in https://github.com/apache/airflow/issues/18146. The fix ensured that when a task was reset for a retry, the next_method and next_kwargs columns in taskinstance table were explicitly nullified. Airflow 3.x: With the transition to the new Internal API and Task SDK (e.g., Issue https://github.com/apache/airflow/issues/47373), this reset logic appears to be missing and subsequent attempts skip execute() entirely. ### What you think should happen instead? Taskinstance attributes like next_method and next_kwargs should get reset on retry, so we can retry the task from the beginning. ### How to reproduce ```python from datetime import datetime, timedelta from airflow import DAG from airflow.exceptions import AirflowException from airflow.sdk import BaseOperator from airflow.triggers.testing import SuccessTrigger class RetryOperator(BaseOperator): def execute(self, context): ti = context["ti"] has_next_method = bool(ti._ti_context_from_server.next_method) try_number = ti.try_number self.log.info( f"In `execute`: has_next_method: {has_next_method}, try_number:{try_number}" ) self.defer( trigger=SuccessTrigger(), method_name="next", kwargs={"execute_try_number": try_number}, ) def next(self, context, execute_try_number, event=None): self.log.info("In next!") ti = context["ti"] has_next_method = bool(ti._ti_context_from_server.next_method) try_number = ti.try_number self.log.info( f"In `next`: has_next_method: {has_next_method}, try_number:{try_number}, excute_try_number: {execute_try_number}" ) if try_number == 1: # Force a retry raise AirflowException("Force a retry") # Did we run `execute`? if execute_try_number != try_number: raise AirflowException("`execute` wasn't run during retry!") return None # Success! with DAG( "triggerer_retry", schedule_interval=None, start_date=datetime(2021, 9, 13) ) as dag: RetryOperator(task_id="retry", retries=1, retry_delay=timedelta(seconds=15)) ``` ### Operating System Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Other Docker-based deployment ### Deployment details _No response_ ### Anything else? #### Temporary workaround: To mitigate this in my custom operators, I’ve had to implement a "Single-Entry" pattern to force idempotency: Always setting method_name="execute" in the self.defer() call. Passing a unique attempt_id (from ti.try_number) into the Trigger. Manually validating the attempt_id inside execute() to detect if the event is fresh or stale. While this works, it adds significant boilerplate and complexity to every custom operator. ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
